From a6279d99056ff63fae56bc2805ee48cc5c99d840 Mon Sep 17 00:00:00 2001 From: Dmitry Anoshin Date: Fri, 22 Dec 2023 16:35:46 -0800 Subject: [PATCH] [exporterhelper] Add WithRequestQueue option to the exporter The new configuration interface for the end users provides a new `queue_size_items` option to limit the queue by a number of spans, log records, or metric data points. The previous way to limit the queue by number of requests is preserved under the same field, `queue_size,` which will later be deprecated through a longer transition process. --- .chloggen/exporter-helper-v2.yaml | 32 +++ exporter/exporterhelper/common.go | 45 ++++- exporter/exporterhelper/logs.go | 9 +- exporter/exporterhelper/logs_test.go | 4 +- exporter/exporterhelper/metrics.go | 9 +- exporter/exporterhelper/metrics_test.go | 4 +- exporter/exporterhelper/queue_sender.go | 38 +--- exporter/exporterhelper/queue_sender_test.go | 189 ++++++++++++------ exporter/exporterhelper/request.go | 6 +- exporter/exporterhelper/retry_sender.go | 4 +- exporter/exporterhelper/retry_sender_test.go | 3 +- exporter/exporterhelper/traces.go | 9 +- exporter/exporterhelper/traces_test.go | 4 +- exporter/exporterqueue/config.go | 61 ++++++ exporter/exporterqueue/config_test.go | 26 +++ exporter/exporterqueue/queue.go | 96 +++++++++ .../queue}/bounded_memory_queue.go | 2 +- .../queue}/bounded_memory_queue_test.go | 2 +- .../internal => internal/queue}/consumers.go | 12 +- .../internal => internal/queue}/err.go | 2 +- .../queue}/mock_storage.go | 2 +- .../queue}/package_test.go | 2 +- .../queue}/persistent_queue.go | 2 +- .../queue}/persistent_queue_test.go | 2 +- .../internal => internal/queue}/queue.go | 2 +- .../queue}/queue_capacity.go | 2 +- .../queue}/queue_capacity_test.go | 2 +- 27 files changed, 434 insertions(+), 137 deletions(-) create mode 100644 .chloggen/exporter-helper-v2.yaml create mode 100644 exporter/exporterqueue/config.go create mode 100644 exporter/exporterqueue/config_test.go create mode 100644 exporter/exporterqueue/queue.go rename exporter/{exporterhelper/internal => internal/queue}/bounded_memory_queue.go (96%) rename exporter/{exporterhelper/internal => internal/queue}/bounded_memory_queue_test.go (99%) rename exporter/{exporterhelper/internal => internal/queue}/consumers.go (73%) rename exporter/{exporterhelper/internal => internal/queue}/err.go (78%) rename exporter/{exporterhelper/internal => internal/queue}/mock_storage.go (98%) rename exporter/{exporterhelper/internal => internal/queue}/package_test.go (91%) rename exporter/{exporterhelper/internal => internal/queue}/persistent_queue.go (99%) rename exporter/{exporterhelper/internal => internal/queue}/persistent_queue_test.go (99%) rename exporter/{exporterhelper/internal => internal/queue}/queue.go (93%) rename exporter/{exporterhelper/internal => internal/queue}/queue_capacity.go (94%) rename exporter/{exporterhelper/internal => internal/queue}/queue_capacity_test.go (98%) diff --git a/.chloggen/exporter-helper-v2.yaml b/.chloggen/exporter-helper-v2.yaml new file mode 100644 index 00000000000..368da4ba2db --- /dev/null +++ b/.chloggen/exporter-helper-v2.yaml @@ -0,0 +1,32 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporter/exporterhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add API for enabling queue in the new exporter helpers. + +# One or more tracking issues or pull requests related to the change +issues: [7874] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + The following experimental API is introduced in exporter package: + - `exporterhelper.WithRequestQueue`: a new exporter helper option for using a queue. + - `exporterqueue.Queue`: an interface for queue implementations. + - `exporterqueue.Factory`: a queue factory interface, implementations of this interface are intended to be used with WithRequestQueue option. + - `exporterqueue.Settings`: queue factory settings. + - `exporterqueue.Config`: common configuration for queue implementations. + - `exporterqueue.NewDefaultConfig`: a function for creating a default queue configuration. + - `exporterqueue.NewMemoryQueueFactory`: a new factory for creating a memory queue. + - `exporterqueue.NewPersistentQueueFactory: a factory for creating a persistent queue. + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/exporter/exporterhelper/common.go b/exporter/exporterhelper/common.go index b5e7aa39a33..f1080bdee04 100644 --- a/exporter/exporterhelper/common.go +++ b/exporter/exporterhelper/common.go @@ -13,6 +13,7 @@ import ( "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/exporterqueue" ) // requestSender is an abstraction of a sender for a request independent of the type of the data (traces, metrics, logs). @@ -85,17 +86,43 @@ func WithRetry(config configretry.BackOffConfig) Option { func WithQueue(config QueueSettings) Option { return func(o *baseExporter) { if o.requestExporter { - panic("queueing is not available for the new request exporters yet") + panic("WithQueue option is not available for the new request exporters, use WithRequestQueue instead") } if !config.Enabled { o.exportFailureMessage += " Try enabling sending_queue to survive temporary failures." return } - consumeErrHandler := func(err error, req Request) { - o.set.Logger.Error("Exporting failed. Dropping data."+o.exportFailureMessage, - zap.Error(err), zap.Int("dropped_items", req.ItemsCount())) + qf := exporterqueue.NewPersistentQueueFactory[Request](config.StorageID, exporterqueue.PersistentQueueSettings[Request]{ + Marshaler: o.marshaler, + Unmarshaler: o.unmarshaler, + }) + q := qf(context.Background(), exporterqueue.Settings{ + DataType: o.signal, + ExporterSettings: o.set, + }, exporterqueue.Config{ + Enabled: config.Enabled, + NumConsumers: config.NumConsumers, + QueueSize: config.QueueSize, + }) + o.queueSender = newQueueSender(q, o.set, config.NumConsumers, o.exportFailureMessage) + } +} + +// WithRequestQueue enables queueing for an exporter. +// This option should be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +func WithRequestQueue(cfg exporterqueue.Config, queueFactory exporterqueue.Factory[Request]) Option { + return func(o *baseExporter) { + if !cfg.Enabled { + o.exportFailureMessage += " Try enabling sending_queue to survive temporary failures." + return + } + set := exporterqueue.Settings{ + DataType: o.signal, + ExporterSettings: o.set, } - o.queueSender = newQueueSender(config, o.set, o.signal, o.marshaler, o.unmarshaler, consumeErrHandler) + o.queueSender = newQueueSender(queueFactory(context.Background(), set, cfg), o.set, cfg.NumConsumers, o.exportFailureMessage) } } @@ -114,8 +141,8 @@ type baseExporter struct { component.ShutdownFunc requestExporter bool - marshaler RequestMarshaler - unmarshaler RequestUnmarshaler + marshaler exporterqueue.Marshaler[Request] + unmarshaler exporterqueue.Unmarshaler[Request] signal component.DataType set exporter.CreateSettings @@ -136,8 +163,8 @@ type baseExporter struct { } // TODO: requestExporter, marshaler, and unmarshaler arguments can be removed when the old exporter helpers will be updated to call the new ones. -func newBaseExporter(set exporter.CreateSettings, signal component.DataType, requestExporter bool, marshaler RequestMarshaler, - unmarshaler RequestUnmarshaler, osf obsrepSenderFactory, options ...Option) (*baseExporter, error) { +func newBaseExporter(set exporter.CreateSettings, signal component.DataType, requestExporter bool, + marshaler exporterqueue.Marshaler[Request], unmarshaler exporterqueue.Unmarshaler[Request], osf obsrepSenderFactory, options ...Option) (*baseExporter, error) { obsReport, err := NewObsReport(ObsReportSettings{ExporterID: set.ID, ExporterCreateSettings: set}) if err != nil { diff --git a/exporter/exporterhelper/logs.go b/exporter/exporterhelper/logs.go index f08bf0e6da6..466b55ed9bb 100644 --- a/exporter/exporterhelper/logs.go +++ b/exporter/exporterhelper/logs.go @@ -13,7 +13,8 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" - "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + "go.opentelemetry.io/collector/exporter/exporterqueue" + "go.opentelemetry.io/collector/exporter/internal/queue" "go.opentelemetry.io/collector/pdata/plog" ) @@ -32,7 +33,7 @@ func newLogsRequest(ld plog.Logs, pusher consumer.ConsumeLogsFunc) Request { } } -func newLogsRequestUnmarshalerFunc(pusher consumer.ConsumeLogsFunc) RequestUnmarshaler { +func newLogsRequestUnmarshalerFunc(pusher consumer.ConsumeLogsFunc) exporterqueue.Unmarshaler[Request] { return func(bytes []byte) (Request, error) { logs, err := logsUnmarshaler.UnmarshalLogs(bytes) if err != nil { @@ -96,7 +97,7 @@ func NewLogsExporter( lc, err := consumer.NewLogs(func(ctx context.Context, ld plog.Logs) error { req := newLogsRequest(ld, pusher) serr := be.send(ctx, req) - if errors.Is(serr, internal.ErrQueueIsFull) { + if errors.Is(serr, queue.ErrQueueIsFull) { be.obsrep.recordEnqueueFailure(ctx, component.DataTypeLogs, int64(req.ItemsCount())) } return serr @@ -144,7 +145,7 @@ func NewLogsRequestExporter( return consumererror.NewPermanent(cErr) } sErr := be.send(ctx, req) - if errors.Is(sErr, internal.ErrQueueIsFull) { + if errors.Is(sErr, queue.ErrQueueIsFull) { be.obsrep.recordEnqueueFailure(ctx, component.DataTypeLogs, int64(req.ItemsCount())) } return sErr diff --git a/exporter/exporterhelper/logs_test.go b/exporter/exporterhelper/logs_test.go index 5349f767398..7fa5731890a 100644 --- a/exporter/exporterhelper/logs_test.go +++ b/exporter/exporterhelper/logs_test.go @@ -25,8 +25,8 @@ import ( "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/exporter" - "go.opentelemetry.io/collector/exporter/exporterhelper/internal" "go.opentelemetry.io/collector/exporter/exportertest" + "go.opentelemetry.io/collector/exporter/internal/queue" "go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics" "go.opentelemetry.io/collector/internal/testdata" "go.opentelemetry.io/collector/pdata/plog" @@ -167,7 +167,7 @@ func TestLogsExporter_WithPersistentQueue(t *testing.T) { require.NoError(t, err) host := &mockHost{ext: map[component.ID]component.Component{ - storageID: internal.NewMockStorageExtension(nil), + storageID: queue.NewMockStorageExtension(nil), }} require.NoError(t, te.Start(context.Background(), host)) t.Cleanup(func() { require.NoError(t, te.Shutdown(context.Background())) }) diff --git a/exporter/exporterhelper/metrics.go b/exporter/exporterhelper/metrics.go index 7360d548962..bdb284bb11a 100644 --- a/exporter/exporterhelper/metrics.go +++ b/exporter/exporterhelper/metrics.go @@ -13,7 +13,8 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" - "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + "go.opentelemetry.io/collector/exporter/exporterqueue" + "go.opentelemetry.io/collector/exporter/internal/queue" "go.opentelemetry.io/collector/pdata/pmetric" ) @@ -32,7 +33,7 @@ func newMetricsRequest(md pmetric.Metrics, pusher consumer.ConsumeMetricsFunc) R } } -func newMetricsRequestUnmarshalerFunc(pusher consumer.ConsumeMetricsFunc) RequestUnmarshaler { +func newMetricsRequestUnmarshalerFunc(pusher consumer.ConsumeMetricsFunc) exporterqueue.Unmarshaler[Request] { return func(bytes []byte) (Request, error) { metrics, err := metricsUnmarshaler.UnmarshalMetrics(bytes) if err != nil { @@ -96,7 +97,7 @@ func NewMetricsExporter( mc, err := consumer.NewMetrics(func(ctx context.Context, md pmetric.Metrics) error { req := newMetricsRequest(md, pusher) serr := be.send(ctx, req) - if errors.Is(serr, internal.ErrQueueIsFull) { + if errors.Is(serr, queue.ErrQueueIsFull) { be.obsrep.recordEnqueueFailure(ctx, component.DataTypeMetrics, int64(req.ItemsCount())) } return serr @@ -144,7 +145,7 @@ func NewMetricsRequestExporter( return consumererror.NewPermanent(cErr) } sErr := be.send(ctx, req) - if errors.Is(sErr, internal.ErrQueueIsFull) { + if errors.Is(sErr, queue.ErrQueueIsFull) { be.obsrep.recordEnqueueFailure(ctx, component.DataTypeMetrics, int64(req.ItemsCount())) } return sErr diff --git a/exporter/exporterhelper/metrics_test.go b/exporter/exporterhelper/metrics_test.go index 65125b4469c..cafbeaaad07 100644 --- a/exporter/exporterhelper/metrics_test.go +++ b/exporter/exporterhelper/metrics_test.go @@ -25,8 +25,8 @@ import ( "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/exporter" - "go.opentelemetry.io/collector/exporter/exporterhelper/internal" "go.opentelemetry.io/collector/exporter/exportertest" + "go.opentelemetry.io/collector/exporter/internal/queue" "go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics" "go.opentelemetry.io/collector/internal/testdata" "go.opentelemetry.io/collector/pdata/pmetric" @@ -168,7 +168,7 @@ func TestMetricsExporter_WithPersistentQueue(t *testing.T) { require.NoError(t, err) host := &mockHost{ext: map[component.ID]component.Component{ - storageID: internal.NewMockStorageExtension(nil), + storageID: queue.NewMockStorageExtension(nil), }} require.NoError(t, te.Start(context.Background(), host)) t.Cleanup(func() { require.NoError(t, te.Shutdown(context.Background())) }) diff --git a/exporter/exporterhelper/queue_sender.go b/exporter/exporterhelper/queue_sender.go index 0f7a84fad29..3092c60c98f 100644 --- a/exporter/exporterhelper/queue_sender.go +++ b/exporter/exporterhelper/queue_sender.go @@ -16,7 +16,8 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/exporter" - "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + "go.opentelemetry.io/collector/exporter/exporterqueue" + "go.opentelemetry.io/collector/exporter/internal/queue" "go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics" ) @@ -71,41 +72,21 @@ func (qCfg *QueueSettings) Validate() error { type queueSender struct { baseRequestSender fullName string - queue internal.Queue[Request] + queue exporterqueue.Queue[Request] traceAttribute attribute.KeyValue logger *zap.Logger meter otelmetric.Meter - consumers *internal.QueueConsumers[Request] + consumers *queue.Consumers[Request] metricCapacity otelmetric.Int64ObservableGauge metricSize otelmetric.Int64ObservableGauge } -func newQueueSender(config QueueSettings, set exporter.CreateSettings, signal component.DataType, - marshaler RequestMarshaler, unmarshaler RequestUnmarshaler, consumeErrHandler func(error, Request)) *queueSender { - - isPersistent := config.StorageID != nil - var queue internal.Queue[Request] - queueSizer := &internal.RequestSizer[Request]{} - if isPersistent { - queue = internal.NewPersistentQueue[Request](internal.PersistentQueueSettings[Request]{ - Sizer: queueSizer, - Capacity: config.QueueSize, - DataType: signal, - StorageID: *config.StorageID, - Marshaler: marshaler, - Unmarshaler: unmarshaler, - ExporterSettings: set, - }) - } else { - queue = internal.NewBoundedMemoryQueue[Request](internal.MemoryQueueSettings[Request]{ - Sizer: queueSizer, - Capacity: config.QueueSize, - }) - } +func newQueueSender(q exporterqueue.Queue[Request], set exporter.CreateSettings, numConsumers int, + exportFailureMessage string) *queueSender { qs := &queueSender{ fullName: set.ID.String(), - queue: queue, + queue: q, traceAttribute: attribute.String(obsmetrics.ExporterKey, set.ID.String()), logger: set.TelemetrySettings.Logger, meter: set.TelemetrySettings.MeterProvider.Meter(scopeName), @@ -113,11 +94,12 @@ func newQueueSender(config QueueSettings, set exporter.CreateSettings, signal co consumeFunc := func(ctx context.Context, req Request) error { err := qs.nextSender.send(ctx, req) if err != nil { - consumeErrHandler(err, req) + set.Logger.Error("Exporting failed. Dropping data."+exportFailureMessage, + zap.Error(err), zap.Int("dropped_items", req.ItemsCount())) } return err } - qs.consumers = internal.NewQueueConsumers(queue, config.NumConsumers, consumeFunc) + qs.consumers = queue.NewQueueConsumers[Request](q, numConsumers, consumeFunc) return qs } diff --git a/exporter/exporterhelper/queue_sender_test.go b/exporter/exporterhelper/queue_sender_test.go index 58da1190e9a..598b63a3f9d 100644 --- a/exporter/exporterhelper/queue_sender_test.go +++ b/exporter/exporterhelper/queue_sender_test.go @@ -18,8 +18,9 @@ import ( "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/exporter" - "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + "go.opentelemetry.io/collector/exporter/exporterqueue" "go.opentelemetry.io/collector/exporter/exportertest" + "go.opentelemetry.io/collector/exporter/internal/queue" ) func TestQueuedRetry_StopWhileWaiting(t *testing.T) { @@ -101,41 +102,85 @@ func TestQueuedRetry_RejectOnFull(t *testing.T) { } func TestQueuedRetryHappyPath(t *testing.T) { - tt, err := componenttest.SetupTelemetry(defaultID) - require.NoError(t, err) - t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) - - qCfg := NewDefaultQueueSettings() - rCfg := configretry.NewDefaultBackOffConfig() - set := exporter.CreateSettings{ID: defaultID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()} - be, err := newBaseExporter(set, defaultType, false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) - require.NoError(t, err) - ocs := be.obsrepSender.(*observabilityConsumerSender) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - t.Cleanup(func() { - assert.NoError(t, be.Shutdown(context.Background())) - }) - - wantRequests := 10 - reqs := make([]*mockRequest, 0, 10) - for i := 0; i < wantRequests; i++ { - ocs.run(func() { - req := newMockRequest(2, nil) - reqs = append(reqs, req) - require.NoError(t, be.send(context.Background(), req)) - }) + tests := []struct { + name string + queueOption Option + }{ + { + name: "WithQueue", + queueOption: WithQueue(QueueSettings{ + Enabled: true, + QueueSize: 10, + NumConsumers: 1, + }), + }, + { + name: "WithRequestQueue/MemoryQueueFactory", + queueOption: WithRequestQueue(exporterqueue.Config{ + Enabled: true, + QueueSize: 10, + NumConsumers: 1, + }, exporterqueue.NewMemoryQueueFactory[Request]()), + }, + { + name: "WithRequestQueue/PersistentQueueFactory", + queueOption: WithRequestQueue(exporterqueue.Config{ + Enabled: true, + QueueSize: 10, + NumConsumers: 1, + }, exporterqueue.NewPersistentQueueFactory[Request](nil, exporterqueue.PersistentQueueSettings[Request]{})), + }, + { + name: "WithRequestQueue/PersistentQueueFactory/RequestsLimit", + queueOption: WithRequestQueue(exporterqueue.Config{ + Enabled: true, + QueueSize: 10, + NumConsumers: 1, + }, exporterqueue.NewPersistentQueueFactory[Request](nil, exporterqueue.PersistentQueueSettings[Request]{})), + }, } - - // Wait until all batches received - ocs.awaitAsyncProcessing() - - require.Len(t, reqs, wantRequests) - for _, req := range reqs { - req.checkNumRequests(t, 1) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tel, err := componenttest.SetupTelemetry(defaultID) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, tel.Shutdown(context.Background())) }) + + rCfg := configretry.NewDefaultBackOffConfig() + set := exporter.CreateSettings{ID: defaultID, TelemetrySettings: tel.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()} + be, err := newBaseExporter(set, defaultType, false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), tt.queueOption) + require.NoError(t, err) + ocs := be.obsrepSender.(*observabilityConsumerSender) + + wantRequests := 10 + reqs := make([]*mockRequest, 0, 10) + for i := 0; i < wantRequests; i++ { + ocs.run(func() { + req := newMockRequest(2, nil) + reqs = append(reqs, req) + require.NoError(t, be.send(context.Background(), req)) + }) + } + + // expect queue to be full + require.Error(t, be.send(context.Background(), newMockRequest(2, nil))) + + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + assert.NoError(t, be.Shutdown(context.Background())) + }) + + // Wait until all batches received + ocs.awaitAsyncProcessing() + + require.Len(t, reqs, wantRequests) + for _, req := range reqs { + req.checkNumRequests(t, 1) + } + + ocs.checkSendItemsCount(t, 2*wantRequests) + ocs.checkDroppedItemsCount(t, 0) + }) } - - ocs.checkSendItemsCount(t, 2*wantRequests) - ocs.checkDroppedItemsCount(t, 0) } func TestQueuedRetry_QueueMetricsReported(t *testing.T) { tt, err := componenttest.SetupTelemetry(defaultID) @@ -193,27 +238,52 @@ func TestQueueSettings_Validate(t *testing.T) { } func TestQueueRetryWithDisabledQueue(t *testing.T) { - qs := NewDefaultQueueSettings() - qs.Enabled = false - set := exportertest.NewNopCreateSettings() - logger, observed := observer.New(zap.ErrorLevel) - set.Logger = zap.New(logger) - be, err := newBaseExporter(set, component.DataTypeLogs, false, nil, nil, newObservabilityConsumerSender, - WithQueue(qs)) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - ocs := be.obsrepSender.(*observabilityConsumerSender) - mockR := newMockRequest(2, errors.New("some error")) - ocs.run(func() { - require.Error(t, be.send(context.Background(), mockR)) - }) - assert.Len(t, observed.All(), 1) - assert.Equal(t, "Exporting failed. Rejecting data. Try enabling sending_queue to survive temporary failures.", observed.All()[0].Message) - ocs.awaitAsyncProcessing() - mockR.checkNumRequests(t, 1) - ocs.checkSendItemsCount(t, 0) - ocs.checkDroppedItemsCount(t, 2) - require.NoError(t, be.Shutdown(context.Background())) + tests := []struct { + name string + queueOption Option + }{ + { + name: "WithQueue", + queueOption: func() Option { + qs := NewDefaultQueueSettings() + qs.Enabled = false + return WithQueue(qs) + }(), + }, + { + name: "WithRequestQueue", + queueOption: func() Option { + qs := exporterqueue.NewDefaultConfig() + qs.Enabled = false + return WithRequestQueue(qs, exporterqueue.NewMemoryQueueFactory[Request]()) + }(), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + set := exportertest.NewNopCreateSettings() + logger, observed := observer.New(zap.ErrorLevel) + set.Logger = zap.New(logger) + be, err := newBaseExporter(set, component.DataTypeLogs, false, nil, nil, newObservabilityConsumerSender, + tt.queueOption) + require.NoError(t, err) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + ocs := be.obsrepSender.(*observabilityConsumerSender) + mockR := newMockRequest(2, errors.New("some error")) + ocs.run(func() { + require.Error(t, be.send(context.Background(), mockR)) + }) + assert.Len(t, observed.All(), 1) + assert.Equal(t, "Exporting failed. Rejecting data. Try enabling sending_queue to survive temporary failures.", observed.All()[0].Message) + ocs.awaitAsyncProcessing() + mockR.checkNumRequests(t, 1) + ocs.checkSendItemsCount(t, 0) + ocs.checkDroppedItemsCount(t, 2) + require.NoError(t, be.Shutdown(context.Background())) + }) + } + } func TestQueueFailedRequestDropped(t *testing.T) { @@ -245,7 +315,7 @@ func TestQueuedRetryPersistenceEnabled(t *testing.T) { require.NoError(t, err) var extensions = map[component.ID]component.Component{ - storageID: internal.NewMockStorageExtension(nil), + storageID: queue.NewMockStorageExtension(nil), } host := &mockHost{ext: extensions} @@ -269,7 +339,7 @@ func TestQueuedRetryPersistenceEnabledStorageError(t *testing.T) { require.NoError(t, err) var extensions = map[component.ID]component.Component{ - storageID: internal.NewMockStorageExtension(storageError), + storageID: queue.NewMockStorageExtension(storageError), } host := &mockHost{ext: extensions} @@ -293,7 +363,7 @@ func TestQueuedRetryPersistentEnabled_NoDataLossOnShutdown(t *testing.T) { require.NoError(t, err) var extensions = map[component.ID]component.Component{ - storageID: internal.NewMockStorageExtension(nil), + storageID: queue.NewMockStorageExtension(nil), } host := &mockHost{ext: extensions} @@ -323,7 +393,8 @@ func TestQueuedRetryPersistentEnabled_NoDataLossOnShutdown(t *testing.T) { } func TestQueueSenderNoStartShutdown(t *testing.T) { - qs := newQueueSender(NewDefaultQueueSettings(), exportertest.NewNopCreateSettings(), defaultType, nil, nil, nil) + queue := queue.NewBoundedMemoryQueue[Request](queue.MemoryQueueSettings[Request]{}) + qs := newQueueSender(queue, exportertest.NewNopCreateSettings(), 1, "") assert.NoError(t, qs.Shutdown(context.Background())) } diff --git a/exporter/exporterhelper/request.go b/exporter/exporterhelper/request.go index c29da3a10a3..03276f9c19e 100644 --- a/exporter/exporterhelper/request.go +++ b/exporter/exporterhelper/request.go @@ -33,13 +33,11 @@ type RequestErrorHandler interface { } // RequestMarshaler is a function that can marshal a Request into bytes. -// This API is at the early stage of development and may change without backward compatibility -// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +// Deprecated: [v0.94.0] Use exporterqueue.Marshaler[Request] instead. type RequestMarshaler func(req Request) ([]byte, error) // RequestUnmarshaler is a function that can unmarshal bytes into a Request. -// This API is at the early stage of development and may change without backward compatibility -// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +// Deprecated: [v0.94.0] Use exporterqueue.Unmarshaler[Request] instead. type RequestUnmarshaler func(data []byte) (Request, error) // extractPartialRequest returns a new Request that may contain the items left to be sent diff --git a/exporter/exporterhelper/retry_sender.go b/exporter/exporterhelper/retry_sender.go index 1bf24157898..c6055df0673 100644 --- a/exporter/exporterhelper/retry_sender.go +++ b/exporter/exporterhelper/retry_sender.go @@ -17,7 +17,7 @@ import ( "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" - "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + "go.opentelemetry.io/collector/exporter/internal/queue" "go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics" ) @@ -127,7 +127,7 @@ func (rs *retrySender) send(ctx context.Context, req Request) error { case <-ctx.Done(): return fmt.Errorf("request is cancelled or timed out %w", err) case <-rs.stopCh: - return internal.NewShutdownErr(err) + return queue.NewShutdownErr(err) case <-time.After(backoffDelay): } } diff --git a/exporter/exporterhelper/retry_sender_test.go b/exporter/exporterhelper/retry_sender_test.go index b26c465e1dc..319b1cfdcbe 100644 --- a/exporter/exporterhelper/retry_sender_test.go +++ b/exporter/exporterhelper/retry_sender_test.go @@ -20,11 +20,12 @@ import ( "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/consumer/consumererror" + "go.opentelemetry.io/collector/exporter/exporterqueue" "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/internal/testdata" ) -func mockRequestUnmarshaler(mr Request) RequestUnmarshaler { +func mockRequestUnmarshaler(mr Request) exporterqueue.Unmarshaler[Request] { return func(bytes []byte) (Request, error) { return mr, nil } diff --git a/exporter/exporterhelper/traces.go b/exporter/exporterhelper/traces.go index 53c6533cc2d..806ea2d485c 100644 --- a/exporter/exporterhelper/traces.go +++ b/exporter/exporterhelper/traces.go @@ -13,7 +13,8 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" - "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + "go.opentelemetry.io/collector/exporter/exporterqueue" + "go.opentelemetry.io/collector/exporter/internal/queue" "go.opentelemetry.io/collector/pdata/ptrace" ) @@ -32,7 +33,7 @@ func newTracesRequest(td ptrace.Traces, pusher consumer.ConsumeTracesFunc) Reque } } -func newTraceRequestUnmarshalerFunc(pusher consumer.ConsumeTracesFunc) RequestUnmarshaler { +func newTraceRequestUnmarshalerFunc(pusher consumer.ConsumeTracesFunc) exporterqueue.Unmarshaler[Request] { return func(bytes []byte) (Request, error) { traces, err := tracesUnmarshaler.UnmarshalTraces(bytes) if err != nil { @@ -96,7 +97,7 @@ func NewTracesExporter( tc, err := consumer.NewTraces(func(ctx context.Context, td ptrace.Traces) error { req := newTracesRequest(td, pusher) serr := be.send(ctx, req) - if errors.Is(serr, internal.ErrQueueIsFull) { + if errors.Is(serr, queue.ErrQueueIsFull) { be.obsrep.recordEnqueueFailure(ctx, component.DataTypeTraces, int64(req.ItemsCount())) } return serr @@ -144,7 +145,7 @@ func NewTracesRequestExporter( return consumererror.NewPermanent(cErr) } sErr := be.send(ctx, req) - if errors.Is(sErr, internal.ErrQueueIsFull) { + if errors.Is(sErr, queue.ErrQueueIsFull) { be.obsrep.recordEnqueueFailure(ctx, component.DataTypeTraces, int64(req.ItemsCount())) } return sErr diff --git a/exporter/exporterhelper/traces_test.go b/exporter/exporterhelper/traces_test.go index be5a45ac30d..e7e605204bd 100644 --- a/exporter/exporterhelper/traces_test.go +++ b/exporter/exporterhelper/traces_test.go @@ -25,8 +25,8 @@ import ( "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/exporter" - "go.opentelemetry.io/collector/exporter/exporterhelper/internal" "go.opentelemetry.io/collector/exporter/exportertest" + "go.opentelemetry.io/collector/exporter/internal/queue" "go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics" "go.opentelemetry.io/collector/internal/testdata" "go.opentelemetry.io/collector/pdata/ptrace" @@ -165,7 +165,7 @@ func TestTracesExporter_WithPersistentQueue(t *testing.T) { require.NoError(t, err) host := &mockHost{ext: map[component.ID]component.Component{ - storageID: internal.NewMockStorageExtension(nil), + storageID: queue.NewMockStorageExtension(nil), }} require.NoError(t, te.Start(context.Background(), host)) t.Cleanup(func() { require.NoError(t, te.Shutdown(context.Background())) }) diff --git a/exporter/exporterqueue/config.go b/exporter/exporterqueue/config.go new file mode 100644 index 00000000000..ac888a0c227 --- /dev/null +++ b/exporter/exporterqueue/config.go @@ -0,0 +1,61 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exporterqueue // import "go.opentelemetry.io/collector/exporter/exporterqueue" + +import ( + "errors" + + "go.opentelemetry.io/collector/component" +) + +// Config defines configuration for queueing requests before exporting. +// It's supposed to be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type Config struct { + // Enabled indicates whether to not enqueue batches before exporting. + Enabled bool `mapstructure:"enabled"` + // NumConsumers is the number of consumers from the queue. + NumConsumers int `mapstructure:"num_consumers"` + // QueueSize is the maximum number of requests allowed in queue at any given time. + QueueSize int `mapstructure:"queue_size"` +} + +// NewDefaultConfig returns the default Config. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +func NewDefaultConfig() Config { + return Config{ + Enabled: true, + NumConsumers: 10, + QueueSize: 1_000, + } +} + +// Validate checks if the QueueSettings configuration is valid +func (qCfg *Config) Validate() error { + if !qCfg.Enabled { + return nil + } + if qCfg.NumConsumers <= 0 { + return errors.New("number of consumers must be positive") + } + if qCfg.QueueSize <= 0 { + return errors.New("queue size must be positive") + } + return nil +} + +// PersistentQueueConfig defines configuration for queueing requests in a persistent storage. +// The struct is provided to be added in the exporter configuration as one struct under the "sending_queue" key. +// The exporter helper Go interface requires the fields to be provided separately to WithRequestQueue and +// NewPersistentQueueFactory. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type PersistentQueueConfig struct { + Config `mapstructure:",squash"` + // StorageID if not empty, enables the persistent storage and uses the component specified + // as a storage extension for the persistent queue + StorageID *component.ID `mapstructure:"storage"` +} diff --git a/exporter/exporterqueue/config_test.go b/exporter/exporterqueue/config_test.go new file mode 100644 index 00000000000..c1b43ba5f8e --- /dev/null +++ b/exporter/exporterqueue/config_test.go @@ -0,0 +1,26 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exporterqueue + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestQueueConfig_Validate(t *testing.T) { + qCfg := NewDefaultConfig() + assert.NoError(t, qCfg.Validate()) + + qCfg.NumConsumers = 0 + assert.EqualError(t, qCfg.Validate(), "number of consumers must be positive") + + qCfg = NewDefaultConfig() + qCfg.QueueSize = 0 + assert.EqualError(t, qCfg.Validate(), "queue size must be positive") + + // Confirm Validate doesn't return error with invalid config when feature is disabled + qCfg.Enabled = false + assert.NoError(t, qCfg.Validate()) +} diff --git a/exporter/exporterqueue/queue.go b/exporter/exporterqueue/queue.go new file mode 100644 index 00000000000..556dce9f9c2 --- /dev/null +++ b/exporter/exporterqueue/queue.go @@ -0,0 +1,96 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exporterqueue // import "go.opentelemetry.io/collector/exporter/exporterqueue" + +import ( + "context" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/internal/queue" +) + +// Queue defines a producer-consumer exchange which can be backed by e.g. the memory-based ring buffer queue +// (boundedMemoryQueue) or via a disk-based queue (persistentQueue) +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type Queue[T any] queue.Queue[T] + +// Settings defines settings for creating a queue. +type Settings struct { + DataType component.DataType + ExporterSettings exporter.CreateSettings +} + +// Marshaler is a function that can marshal a request into bytes. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type Marshaler[T any] func(T) ([]byte, error) + +// Unmarshaler is a function that can unmarshal bytes into a request. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type Unmarshaler[T any] func([]byte) (T, error) + +// Factory is a function that creates a new queue. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type Factory[T any] func(context.Context, Settings, Config) Queue[T] + +// NewMemoryQueueFactory returns a factory to create a new memory queue. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +func NewMemoryQueueFactory[T itemsCounter]() Factory[T] { + return func(_ context.Context, _ Settings, cfg Config) Queue[T] { + return queue.NewBoundedMemoryQueue[T](queue.MemoryQueueSettings[T]{ + Sizer: sizerFromConfig[T](cfg), + Capacity: capacityFromConfig(cfg), + }) + } +} + +// PersistentQueueSettings defines developer settings for the persistent queue factory. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type PersistentQueueSettings[T any] struct { + // Marshaler is used to serialize queue elements before storing them in the persistent storage. + Marshaler Marshaler[T] + // Unmarshaler is used to deserialize requests after reading them from the persistent storage. + Unmarshaler Unmarshaler[T] +} + +// NewPersistentQueueFactory returns a factory to create a new persistent queue. +// If cfg.StorageID is nil then it falls back to memory queue. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +func NewPersistentQueueFactory[T itemsCounter](storageID *component.ID, factorySettings PersistentQueueSettings[T]) Factory[T] { + if storageID == nil { + return NewMemoryQueueFactory[T]() + } + return func(_ context.Context, set Settings, cfg Config) Queue[T] { + return queue.NewPersistentQueue[T](queue.PersistentQueueSettings[T]{ + Sizer: sizerFromConfig[T](cfg), + Capacity: capacityFromConfig(cfg), + DataType: set.DataType, + StorageID: *storageID, + Marshaler: factorySettings.Marshaler, + Unmarshaler: factorySettings.Unmarshaler, + ExporterSettings: set.ExporterSettings, + }) + } +} + +type itemsCounter interface { + ItemsCount() int +} + +func sizerFromConfig[T itemsCounter](Config) queue.Sizer[T] { + // TODO: Handle other ways to measure the queue size once they are added. + return &queue.RequestSizer[T]{} +} + +func capacityFromConfig(cfg Config) int { + // TODO: Handle other ways to measure the queue size once they are added. + return cfg.QueueSize +} diff --git a/exporter/exporterhelper/internal/bounded_memory_queue.go b/exporter/internal/queue/bounded_memory_queue.go similarity index 96% rename from exporter/exporterhelper/internal/bounded_memory_queue.go rename to exporter/internal/queue/bounded_memory_queue.go index 85435d2aa61..9f85e8496bf 100644 --- a/exporter/exporterhelper/internal/bounded_memory_queue.go +++ b/exporter/internal/queue/bounded_memory_queue.go @@ -3,7 +3,7 @@ // Copyright (c) 2017 Uber Technologies, Inc. // SPDX-License-Identifier: Apache-2.0 -package internal // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal" +package queue // import "go.opentelemetry.io/collector/exporter/internal/queue" import ( "context" diff --git a/exporter/exporterhelper/internal/bounded_memory_queue_test.go b/exporter/internal/queue/bounded_memory_queue_test.go similarity index 99% rename from exporter/exporterhelper/internal/bounded_memory_queue_test.go rename to exporter/internal/queue/bounded_memory_queue_test.go index a26d32120cd..8daa1c2b933 100644 --- a/exporter/exporterhelper/internal/bounded_memory_queue_test.go +++ b/exporter/internal/queue/bounded_memory_queue_test.go @@ -3,7 +3,7 @@ // Copyright (c) 2017 Uber Technologies, Inc. // SPDX-License-Identifier: Apache-2.0 -package internal +package queue import ( "context" diff --git a/exporter/exporterhelper/internal/consumers.go b/exporter/internal/queue/consumers.go similarity index 73% rename from exporter/exporterhelper/internal/consumers.go rename to exporter/internal/queue/consumers.go index 88b729ebfed..7c57fea9620 100644 --- a/exporter/exporterhelper/internal/consumers.go +++ b/exporter/internal/queue/consumers.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package internal // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal" +package queue // import "go.opentelemetry.io/collector/exporter/internal/queue" import ( "context" @@ -10,15 +10,15 @@ import ( "go.opentelemetry.io/collector/component" ) -type QueueConsumers[T any] struct { +type Consumers[T any] struct { queue Queue[T] numConsumers int consumeFunc func(context.Context, T) error stopWG sync.WaitGroup } -func NewQueueConsumers[T any](q Queue[T], numConsumers int, consumeFunc func(context.Context, T) error) *QueueConsumers[T] { - return &QueueConsumers[T]{ +func NewQueueConsumers[T any](q Queue[T], numConsumers int, consumeFunc func(context.Context, T) error) *Consumers[T] { + return &Consumers[T]{ queue: q, numConsumers: numConsumers, consumeFunc: consumeFunc, @@ -27,7 +27,7 @@ func NewQueueConsumers[T any](q Queue[T], numConsumers int, consumeFunc func(con } // Start ensures that queue and all consumers are started. -func (qc *QueueConsumers[T]) Start(ctx context.Context, host component.Host) error { +func (qc *Consumers[T]) Start(ctx context.Context, host component.Host) error { if err := qc.queue.Start(ctx, host); err != nil { return err } @@ -52,7 +52,7 @@ func (qc *QueueConsumers[T]) Start(ctx context.Context, host component.Host) err } // Shutdown ensures that queue and all consumers are stopped. -func (qc *QueueConsumers[T]) Shutdown(ctx context.Context) error { +func (qc *Consumers[T]) Shutdown(ctx context.Context) error { if err := qc.queue.Shutdown(ctx); err != nil { return err } diff --git a/exporter/exporterhelper/internal/err.go b/exporter/internal/queue/err.go similarity index 78% rename from exporter/exporterhelper/internal/err.go rename to exporter/internal/queue/err.go index c93bd92f556..a3b30ac9604 100644 --- a/exporter/exporterhelper/internal/err.go +++ b/exporter/internal/queue/err.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package internal // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal" +package queue // import "go.opentelemetry.io/collector/exporter/internal/queue" type shutdownErr struct { err error diff --git a/exporter/exporterhelper/internal/mock_storage.go b/exporter/internal/queue/mock_storage.go similarity index 98% rename from exporter/exporterhelper/internal/mock_storage.go rename to exporter/internal/queue/mock_storage.go index 507e1e6946e..147e28c6cbf 100644 --- a/exporter/exporterhelper/internal/mock_storage.go +++ b/exporter/internal/queue/mock_storage.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package internal // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal" +package queue // import "go.opentelemetry.io/collector/exporter/internal/queue" import ( "context" diff --git a/exporter/exporterhelper/internal/package_test.go b/exporter/internal/queue/package_test.go similarity index 91% rename from exporter/exporterhelper/internal/package_test.go rename to exporter/internal/queue/package_test.go index 4486cdb28aa..e73f51264a8 100644 --- a/exporter/exporterhelper/internal/package_test.go +++ b/exporter/internal/queue/package_test.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package internal +package queue import ( "testing" diff --git a/exporter/exporterhelper/internal/persistent_queue.go b/exporter/internal/queue/persistent_queue.go similarity index 99% rename from exporter/exporterhelper/internal/persistent_queue.go rename to exporter/internal/queue/persistent_queue.go index 9f23ab6a98c..bdcbeb07641 100644 --- a/exporter/exporterhelper/internal/persistent_queue.go +++ b/exporter/internal/queue/persistent_queue.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package internal // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal" +package queue // import "go.opentelemetry.io/collector/exporter/internal/queue" import ( "context" diff --git a/exporter/exporterhelper/internal/persistent_queue_test.go b/exporter/internal/queue/persistent_queue_test.go similarity index 99% rename from exporter/exporterhelper/internal/persistent_queue_test.go rename to exporter/internal/queue/persistent_queue_test.go index 8b74f9a5fac..4bf0c082c77 100644 --- a/exporter/exporterhelper/internal/persistent_queue_test.go +++ b/exporter/internal/queue/persistent_queue_test.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package internal +package queue import ( "context" diff --git a/exporter/exporterhelper/internal/queue.go b/exporter/internal/queue/queue.go similarity index 93% rename from exporter/exporterhelper/internal/queue.go rename to exporter/internal/queue/queue.go index 8bd8879a940..0ae0703b05d 100644 --- a/exporter/exporterhelper/internal/queue.go +++ b/exporter/internal/queue/queue.go @@ -3,7 +3,7 @@ // Copyright (c) 2017 Uber Technologies, Inc. // SPDX-License-Identifier: Apache-2.0 -package internal // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal" +package queue // import "go.opentelemetry.io/collector/exporter/internal/queue" import ( "context" diff --git a/exporter/exporterhelper/internal/queue_capacity.go b/exporter/internal/queue/queue_capacity.go similarity index 94% rename from exporter/exporterhelper/internal/queue_capacity.go rename to exporter/internal/queue/queue_capacity.go index 0466de0d8b2..1995febcd63 100644 --- a/exporter/exporterhelper/internal/queue_capacity.go +++ b/exporter/internal/queue/queue_capacity.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package internal // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal" +package queue // import "go.opentelemetry.io/collector/exporter/internal/queue" import ( "sync/atomic" diff --git a/exporter/exporterhelper/internal/queue_capacity_test.go b/exporter/internal/queue/queue_capacity_test.go similarity index 98% rename from exporter/exporterhelper/internal/queue_capacity_test.go rename to exporter/internal/queue/queue_capacity_test.go index 7a3b3ad41f2..3dd6ad2b898 100644 --- a/exporter/exporterhelper/internal/queue_capacity_test.go +++ b/exporter/internal/queue/queue_capacity_test.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package internal +package queue import ( "testing"