From 8070a59b24bc26ba9e870507511dbb5145909c0a Mon Sep 17 00:00:00 2001 From: Dmitry Anoshin Date: Thu, 17 Aug 2023 23:18:04 -0700 Subject: [PATCH] [exporterhelper] Add queue options to the new exporter helper This change enabled queue capability for the new exporter helper. For now, it preserves the same user configuration interface as the existing exporter helper has. The only difference is that implementing persistence is optional now as it requires providing marshal and unmarshal functions for the custom request. Later, it's possible to introduce more options for controlling the queue: count of items or bytes in the queue. --- .chloggen/exporter-helper-v2.yaml | 33 +++++++ exporter/exporterhelper/common.go | 48 +++++++++- exporter/exporterhelper/logs_test.go | 48 ++++++++-- exporter/exporterhelper/metrics_test.go | 49 ++++++++-- exporter/exporterhelper/queued_retry.go | 58 ++++++++++++ exporter/exporterhelper/queued_retry_test.go | 98 ++++++++++++++++++++ exporter/exporterhelper/request.go | 10 ++ exporter/exporterhelper/request_test.go | 52 ++++++++--- exporter/exporterhelper/traces_test.go | 50 ++++++++-- 9 files changed, 407 insertions(+), 39 deletions(-) create mode 100644 .chloggen/exporter-helper-v2.yaml diff --git a/.chloggen/exporter-helper-v2.yaml b/.chloggen/exporter-helper-v2.yaml new file mode 100644 index 00000000000..7646f4ec03a --- /dev/null +++ b/.chloggen/exporter-helper-v2.yaml @@ -0,0 +1,33 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporter/exporterhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add API for enabling queue in the new exporter helpers. + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + The following experimental API is introduced in exporter/exporterhelper package: + - `RequestMarshaler`: a new interface for marshaling client-provided requests. + - `RequestUnmarshaler`: a new interface for unmarshaling client-provided requests. + - `WithMemoryQueue`: a new exporter helper option for using a memory queue. + - `WithPersistentQueue`: a new exporter helper option for using a persistent queue. + - `QueueConfig`: a configuration for queueing requests used by WithMemoryQueue option. + - `NewDefaultQueueConfig`: a function for creating a default QueueConfig. + - `PersistentQueueConfig`: a configuration for queueing requests in persistent storage used by WithPersistentQueue option. + - `NewDefaultPersistentQueueConfig`: a function for creating a default PersistentQueueConfig. + All the new APIs are intended to be used by exporters that operate over client-provided requests instead of pdata. + +# One or more tracking issues or pull requests related to the change +issues: [7874] + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/exporter/exporterhelper/common.go b/exporter/exporterhelper/common.go index 3dfe4c41311..ef5256c0e69 100644 --- a/exporter/exporterhelper/common.go +++ b/exporter/exporterhelper/common.go @@ -5,6 +5,7 @@ package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporte import ( "context" + "fmt" "time" "go.opentelemetry.io/collector/component" @@ -131,7 +132,8 @@ func WithRetry(retrySettings RetrySettings) Option { func WithQueue(config QueueSettings) Option { return func(o *baseSettings) { if o.requestExporter { - panic("queueing is not available for the new request exporters yet") + panic("this option is not available for the new request exporters, " + + "use WithMemoryQueue or WithPersistentQueue instead") } if !config.Enabled { return @@ -144,6 +146,50 @@ func WithQueue(config QueueSettings) Option { } } +// WithMemoryQueue overrides the default QueueConfig for an exporter to use an in-memory queue. +// This option should be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +func WithMemoryQueue(config QueueConfig) Option { + return func(o *baseSettings) { + if !config.Enabled { + return + } + o.queue = internal.NewBoundedMemoryQueue(config.QueueSize, config.NumConsumers) + } +} + +// WithPersistentQueue overrides the default QueueConfig for an exporter to use a persistent queue. +// This option should be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +func WithPersistentQueue(config PersistentQueueConfig, marshaler RequestMarshaler, unmarshaler RequestUnmarshaler) Option { + return func(o *baseSettings) { + if !config.Enabled { + return + } + o.queue = internal.NewPersistentQueue(config.QueueSize, config.NumConsumers, *config.StorageID, + func(req internal.Request) ([]byte, error) { + r, ok := req.(*request) + if !ok { + return nil, fmt.Errorf("invalid request type: %T", req) + } + return marshaler(r.Request) + }, + func(data []byte) (internal.Request, error) { + req, err := unmarshaler(data) + if err != nil { + return nil, err + } + return &request{ + Request: req, + baseRequest: baseRequest{ctx: context.Background()}, + }, nil + }, + ) + } +} + // WithCapabilities overrides the default Capabilities() function for a Consumer. // The default is non-mutable data. // TODO: Verify if we can change the default to be mutable as we do for processors. diff --git a/exporter/exporterhelper/logs_test.go b/exporter/exporterhelper/logs_test.go index 070679b2e96..d88ac231aa2 100644 --- a/exporter/exporterhelper/logs_test.go +++ b/exporter/exporterhelper/logs_test.go @@ -6,6 +6,7 @@ package exporterhelper import ( "context" "errors" + "sync/atomic" "testing" "time" @@ -143,12 +144,12 @@ func TestLogsRequestExporter_Default_ConvertError(t *testing.T) { func TestLogsRequestExporter_Default_ExportError(t *testing.T) { ld := plog.NewLogs() - want := errors.New("export_error") + wantErr := errors.New("export_error") le, err := NewLogsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), - &fakeRequestConverter{requestError: want}) + &fakeRequestConverter{exportCallback: func(Request) error { return wantErr }}) require.NoError(t, err) require.NotNil(t, le) - require.Equal(t, want, le.ConsumeLogs(context.Background(), ld)) + require.Equal(t, wantErr, le.ConsumeLogs(context.Background(), ld)) } func TestLogsExporter_WithPersistentQueue(t *testing.T) { @@ -175,6 +176,34 @@ func TestLogsExporter_WithPersistentQueue(t *testing.T) { }, 500*time.Millisecond, 10*time.Millisecond) } +func TestLogsRequestExporter_WithPersistentQueue(t *testing.T) { + qCfg := NewDefaultPersistentQueueConfig() + storageID := component.NewIDWithName("file_storage", "storage") + qCfg.StorageID = &storageID + acc := &atomic.Uint32{} + set := exportertest.NewNopCreateSettings() + set.ID = component.NewIDWithName("test_logs_request", "with_persistent_queue") + rc := &fakeRequestConverter{exportCallback: func(req Request) error { + acc.Add(uint32(req.(RequestItemsCounter).ItemsCount())) + return nil + }} + te, err := NewLogsRequestExporter(context.Background(), set, rc, + WithPersistentQueue(qCfg, rc.requestMarshalerFunc(), rc.requestUnmarshalerFunc())) + require.NoError(t, err) + + host := &mockHost{ext: map[component.ID]component.Component{ + storageID: internal.NewMockStorageExtension(), + }} + require.NoError(t, te.Start(context.Background(), host)) + t.Cleanup(func() { require.NoError(t, te.Shutdown(context.Background())) }) + + require.NoError(t, te.ConsumeLogs(context.Background(), testdata.GenerateLogs(1))) + require.NoError(t, te.ConsumeLogs(context.Background(), testdata.GenerateLogs(2))) + require.Eventually(t, func() bool { + return acc.Load() == 3 + }, 500*time.Millisecond, 10*time.Millisecond) +} + func TestLogsExporter_WithRecordMetrics(t *testing.T) { tt, err := obsreporttest.SetupTelemetry(fakeLogsExporterName) require.NoError(t, err) @@ -213,17 +242,17 @@ func TestLogsExporter_WithRecordMetrics_ReturnError(t *testing.T) { } func TestLogsRequestExporter_WithRecordMetrics_ExportError(t *testing.T) { - want := errors.New("export_error") + wantErr := errors.New("export_error") tt, err := obsreporttest.SetupTelemetry(fakeLogsExporterName) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) le, err := NewLogsRequestExporter(context.Background(), tt.ToExporterCreateSettings(), - &fakeRequestConverter{requestError: want}) + &fakeRequestConverter{exportCallback: func(Request) error { return wantErr }}) require.Nil(t, err) require.NotNil(t, le) - checkRecordedMetricsForLogsExporter(t, tt, le, want) + checkRecordedMetricsForLogsExporter(t, tt, le, wantErr) } func TestLogsExporter_WithRecordEnqueueFailedMetrics(t *testing.T) { @@ -298,11 +327,12 @@ func TestLogsRequestExporter_WithSpan_ReturnError(t *testing.T) { otel.SetTracerProvider(set.TracerProvider) defer otel.SetTracerProvider(trace.NewNoopTracerProvider()) - want := errors.New("my_error") - le, err := NewLogsRequestExporter(context.Background(), set, &fakeRequestConverter{requestError: want}) + wantErr := errors.New("my_error") + le, err := NewLogsRequestExporter(context.Background(), set, + &fakeRequestConverter{exportCallback: func(Request) error { return wantErr }}) require.Nil(t, err) require.NotNil(t, le) - checkWrapSpanForLogsExporter(t, sr, set.TracerProvider.Tracer("test"), le, want, 1) + checkWrapSpanForLogsExporter(t, sr, set.TracerProvider.Tracer("test"), le, wantErr, 1) } func TestLogsExporter_WithShutdown(t *testing.T) { diff --git a/exporter/exporterhelper/metrics_test.go b/exporter/exporterhelper/metrics_test.go index 354a0686f89..73aa19e4c26 100644 --- a/exporter/exporterhelper/metrics_test.go +++ b/exporter/exporterhelper/metrics_test.go @@ -6,6 +6,7 @@ package exporterhelper import ( "context" "errors" + "sync/atomic" "testing" "time" @@ -144,12 +145,12 @@ func TestMetricsRequestExporter_Default_ConvertError(t *testing.T) { func TestMetricsRequestExporter_Default_ExportError(t *testing.T) { md := pmetric.NewMetrics() - want := errors.New("export_error") + wantErr := errors.New("export_error") me, err := NewMetricsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), - fakeRequestConverter{requestError: want}) + fakeRequestConverter{exportCallback: func(req Request) error { return wantErr }}) require.NoError(t, err) require.NotNil(t, me) - require.Equal(t, want, me.ConsumeMetrics(context.Background(), md)) + require.Equal(t, wantErr, me.ConsumeMetrics(context.Background(), md)) } func TestMetricsExporter_WithPersistentQueue(t *testing.T) { @@ -176,6 +177,34 @@ func TestMetricsExporter_WithPersistentQueue(t *testing.T) { }, 500*time.Millisecond, 10*time.Millisecond) } +func TestMetricsRequestExporter_WithPersistentQueue(t *testing.T) { + qCfg := NewDefaultPersistentQueueConfig() + storageID := component.NewIDWithName("file_storage", "storage") + qCfg.StorageID = &storageID + acc := &atomic.Uint32{} + set := exportertest.NewNopCreateSettings() + set.ID = component.NewIDWithName("test_metrics_request", "with_persistent_queue") + rc := &fakeRequestConverter{exportCallback: func(req Request) error { + acc.Add(uint32(req.(RequestItemsCounter).ItemsCount())) + return nil + }} + te, err := NewMetricsRequestExporter(context.Background(), set, rc, + WithPersistentQueue(qCfg, rc.requestMarshalerFunc(), rc.requestUnmarshalerFunc())) + require.NoError(t, err) + + host := &mockHost{ext: map[component.ID]component.Component{ + storageID: internal.NewMockStorageExtension(), + }} + require.NoError(t, te.Start(context.Background(), host)) + t.Cleanup(func() { require.NoError(t, te.Shutdown(context.Background())) }) + + require.NoError(t, te.ConsumeMetrics(context.Background(), testdata.GenerateMetrics(1))) // 2 data points + require.NoError(t, te.ConsumeMetrics(context.Background(), testdata.GenerateMetrics(2))) // 4 data points + require.Eventually(t, func() bool { + return acc.Load() == 6 + }, 500*time.Millisecond, 10*time.Millisecond) +} + func TestMetricsExporter_WithRecordMetrics(t *testing.T) { tt, err := obsreporttest.SetupTelemetry(fakeMetricsExporterName) require.NoError(t, err) @@ -214,16 +243,17 @@ func TestMetricsExporter_WithRecordMetrics_ReturnError(t *testing.T) { } func TestMetricsRequestExporter_WithRecordMetrics_ExportError(t *testing.T) { - want := errors.New("my_error") + wantErr := errors.New("my_error") tt, err := obsreporttest.SetupTelemetry(fakeMetricsExporterName) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) - me, err := NewMetricsRequestExporter(context.Background(), tt.ToExporterCreateSettings(), fakeRequestConverter{requestError: want}) + me, err := NewMetricsRequestExporter(context.Background(), tt.ToExporterCreateSettings(), + fakeRequestConverter{exportCallback: func(req Request) error { return wantErr }}) require.NoError(t, err) require.NotNil(t, me) - checkRecordedMetricsForMetricsExporter(t, tt, me, want) + checkRecordedMetricsForMetricsExporter(t, tt, me, wantErr) } func TestMetricsExporter_WithRecordEnqueueFailedMetrics(t *testing.T) { @@ -298,11 +328,12 @@ func TestMetricsRequestExporter_WithSpan_ExportError(t *testing.T) { otel.SetTracerProvider(set.TracerProvider) defer otel.SetTracerProvider(trace.NewNoopTracerProvider()) - want := errors.New("my_error") - me, err := NewMetricsRequestExporter(context.Background(), set, fakeRequestConverter{requestError: want}) + wantErr := errors.New("my_error") + me, err := NewMetricsRequestExporter(context.Background(), set, + fakeRequestConverter{exportCallback: func(req Request) error { return wantErr }}) require.NoError(t, err) require.NotNil(t, me) - checkWrapSpanForMetricsExporter(t, sr, set.TracerProvider.Tracer("test"), me, want, 2) + checkWrapSpanForMetricsExporter(t, sr, set.TracerProvider.Tracer("test"), me, wantErr, 2) } func TestMetricsExporter_WithShutdown(t *testing.T) { diff --git a/exporter/exporterhelper/queued_retry.go b/exporter/exporterhelper/queued_retry.go index 2e2bc1f9a82..b5e5e9f4d9d 100644 --- a/exporter/exporterhelper/queued_retry.go +++ b/exporter/exporterhelper/queued_retry.go @@ -65,6 +65,64 @@ func (qCfg *QueueSettings) Validate() error { return nil } +// QueueConfig defines configuration for queueing requests before exporting. +// It's supposed to be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type QueueConfig struct { + // Enabled indicates whether to not enqueue batches before exporting. + Enabled bool `mapstructure:"enabled"` + // NumConsumers is the number of consumers from the queue. + NumConsumers int `mapstructure:"num_consumers"` + // QueueSize is the maximum number of batches allowed in queue at a given time. + // This field is left for backward compatibility with QueueSettings. + // Later, it will be replaced with size fields specified explicitly in terms of items or batches. + QueueSize int `mapstructure:"queue_size"` +} + +// NewDefaultQueueConfig returns the default QueueConfig. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +func NewDefaultQueueConfig() QueueConfig { + return QueueConfig{ + Enabled: true, + NumConsumers: 10, + QueueSize: defaultQueueSize, + } +} + +// PersistentQueueConfig defines configuration for queueing requests before exporting using a persistent storage. +// It's supposed to be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter and will replace +// QueueSettings in the future. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type PersistentQueueConfig struct { + QueueConfig `mapstructure:",squash"` + // StorageID if not empty, enables the persistent storage and uses the component specified + // as a storage extension for the persistent queue + StorageID *component.ID `mapstructure:"storage"` +} + +// NewDefaultPersistentQueueConfig returns the default PersistentQueueConfig. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +func NewDefaultPersistentQueueConfig() PersistentQueueConfig { + return PersistentQueueConfig{ + QueueConfig: NewDefaultQueueConfig(), + } +} + +// Validate checks if the QueueSettings configuration is valid +func (qCfg *QueueConfig) Validate() error { + if !qCfg.Enabled { + return nil + } + if qCfg.QueueSize <= 0 { + return errors.New("queue size must be positive") + } + return nil +} + type queuedRetrySender struct { fullName string id component.ID diff --git a/exporter/exporterhelper/queued_retry_test.go b/exporter/exporterhelper/queued_retry_test.go index 3e2f1fb694c..76ff94cdd5a 100644 --- a/exporter/exporterhelper/queued_retry_test.go +++ b/exporter/exporterhelper/queued_retry_test.go @@ -392,6 +392,18 @@ func TestQueueSettings_Validate(t *testing.T) { assert.NoError(t, qCfg.Validate()) } +func TestQueueConfig_Validate(t *testing.T) { + qCfg := NewDefaultQueueConfig() + assert.NoError(t, qCfg.Validate()) + + qCfg.QueueSize = 0 + assert.EqualError(t, qCfg.Validate(), "queue size must be positive") + + // Confirm Validate doesn't return error with invalid config when feature is disabled + qCfg.Enabled = false + assert.NoError(t, qCfg.Validate()) +} + // if requeueing is enabled, we eventually retry even if we failed at first func TestQueuedRetry_RequeuingEnabled(t *testing.T) { qCfg := NewDefaultQueueSettings() @@ -456,6 +468,37 @@ func TestQueueRetryWithDisabledQueue(t *testing.T) { require.NoError(t, be.Shutdown(context.Background())) } +func TestMemoryQueue(t *testing.T) { + bs := newBaseSettings(true, nil, nil, WithMemoryQueue(NewDefaultQueueConfig())) + require.NotNil(t, bs.queue) + be, err := newBaseExporter(exportertest.NewNopCreateSettings(), bs, component.DataTypeLogs) + require.NoError(t, err) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + require.NoError(t, be.Shutdown(context.Background())) +} + +func TestMemoryQueueDisabled(t *testing.T) { + qs := NewDefaultQueueConfig() + qs.Enabled = false + bs := newBaseSettings(true, nil, nil, WithMemoryQueue(qs)) + require.Nil(t, bs.queue) + be, err := newBaseExporter(exportertest.NewNopCreateSettings(), bs, component.DataTypeLogs) + require.NoError(t, err) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + require.NoError(t, be.Shutdown(context.Background())) +} + +func TestPersistentQueueDisabled(t *testing.T) { + qs := NewDefaultPersistentQueueConfig() + qs.Enabled = false + bs := newBaseSettings(true, nil, nil, WithPersistentQueue(qs, nil, nil)) + require.Nil(t, bs.queue) + be, err := newBaseExporter(exportertest.NewNopCreateSettings(), bs, component.DataTypeLogs) + require.NoError(t, err) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + require.NoError(t, be.Shutdown(context.Background())) +} + func TestQueuedRetryPersistenceEnabled(t *testing.T) { tt, err := obsreporttest.SetupTelemetry(defaultID) require.NoError(t, err) @@ -505,6 +548,61 @@ func TestQueuedRetryPersistenceEnabledStorageError(t *testing.T) { require.Error(t, be.Start(context.Background(), host), "could not get storage client") } +func TestPersistentQueueRetryStorageError(t *testing.T) { + storageError := errors.New("could not get storage client") + tt, err := obsreporttest.SetupTelemetry(defaultID) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) + + qCfg := NewDefaultPersistentQueueConfig() + storageID := component.NewIDWithName("file_storage", "storage") + qCfg.StorageID = &storageID // enable persistence + rCfg := NewDefaultRetrySettings() + set := tt.ToExporterCreateSettings() + rc := fakeRequestConverter{} + bs := newBaseSettings(true, nil, nil, WithRetry(rCfg), + WithPersistentQueue(qCfg, rc.requestMarshalerFunc(), rc.requestUnmarshalerFunc())) + be, err := newBaseExporter(set, bs, "") + require.NoError(t, err) + + var extensions = map[component.ID]component.Component{ + storageID: &mockStorageExtension{GetClientError: storageError}, + } + host := &mockHost{ext: extensions} + + // we fail to start if we get an error creating the storage client + require.Error(t, be.Start(context.Background(), host), "could not get storage client") +} + +func TestPersistentQueueRetryUnmarshalError(t *testing.T) { + cfg := NewDefaultPersistentQueueConfig() + storageID := component.NewIDWithName("file_storage", "storage") + cfg.StorageID = &storageID // enable persistence + set := exportertest.NewNopCreateSettings() + set.ID = component.NewIDWithName("test_request", "with_persistent_queue") + unmarshalCalled := &atomic.Bool{} + rc := fakeRequestConverter{} + unmarshaler := func(bytes []byte) (Request, error) { + unmarshalCalled.Store(true) + return nil, errors.New("unmarshal error") + } + bs := newBaseSettings(true, nil, nil, WithPersistentQueue(cfg, rc.requestMarshalerFunc(), unmarshaler)) + be, err := newBaseExporter(set, bs, "") + require.NoError(t, err) + + require.Nil(t, be.Start(context.Background(), &mockHost{ext: map[component.ID]component.Component{ + storageID: internal.NewMockStorageExtension(), + }})) + + require.NoError(t, be.sender.send(&request{ + baseRequest: baseRequest{ctx: context.Background()}, + Request: rc.fakeRequest(1), + })) + require.Eventually(t, func() bool { return unmarshalCalled.Load() }, 100*time.Millisecond, 10*time.Millisecond) + + require.NoError(t, be.Shutdown(context.Background())) +} + func TestQueuedRetryPersistentEnabled_shutdown_dataIsRequeued(t *testing.T) { produceCounter := &atomic.Uint32{} diff --git a/exporter/exporterhelper/request.go b/exporter/exporterhelper/request.go index 407f1f27858..cff61481524 100644 --- a/exporter/exporterhelper/request.go +++ b/exporter/exporterhelper/request.go @@ -52,3 +52,13 @@ func (req *request) Count() int { } return 0 } + +// RequestMarshaler is a function that can marshal a Request into bytes. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type RequestMarshaler func(req Request) ([]byte, error) + +// RequestUnmarshaler is a function that can unmarshal bytes into a Request. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type RequestUnmarshaler func(data []byte) (Request, error) diff --git a/exporter/exporterhelper/request_test.go b/exporter/exporterhelper/request_test.go index 6dd3f67800a..954847932b0 100644 --- a/exporter/exporterhelper/request_test.go +++ b/exporter/exporterhelper/request_test.go @@ -5,6 +5,8 @@ package exporterhelper import ( "context" + "encoding/json" + "errors" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" @@ -12,33 +14,61 @@ import ( ) type fakeRequest struct { - items int - err error + Items int + exportCallback func(req Request) error } func (r fakeRequest) Export(_ context.Context) error { - return r.err + if r.exportCallback == nil { + return nil + } + return r.exportCallback(r) } func (r fakeRequest) ItemsCount() int { - return r.items + return r.Items } type fakeRequestConverter struct { - metricsError error - tracesError error - logsError error - requestError error + metricsError error + tracesError error + logsError error + exportCallback func(req Request) error } func (c fakeRequestConverter) RequestFromMetrics(_ context.Context, md pmetric.Metrics) (Request, error) { - return fakeRequest{items: md.DataPointCount(), err: c.requestError}, c.metricsError + return c.fakeRequest(md.DataPointCount()), c.metricsError } func (c fakeRequestConverter) RequestFromTraces(_ context.Context, td ptrace.Traces) (Request, error) { - return fakeRequest{items: td.SpanCount(), err: c.requestError}, c.tracesError + return c.fakeRequest(td.SpanCount()), c.tracesError } func (c fakeRequestConverter) RequestFromLogs(_ context.Context, ld plog.Logs) (Request, error) { - return fakeRequest{items: ld.LogRecordCount(), err: c.requestError}, c.logsError + return c.fakeRequest(ld.LogRecordCount()), c.logsError +} + +func (c fakeRequestConverter) fakeRequest(items int) Request { + return fakeRequest{Items: items, exportCallback: c.exportCallback} +} + +func (c fakeRequestConverter) requestMarshalerFunc() RequestMarshaler { + return func(req Request) ([]byte, error) { + r, ok := req.(fakeRequest) + if !ok { + return nil, errors.New("invalid request type") + } + return json.Marshal(r) + } +} + +func (c fakeRequestConverter) requestUnmarshalerFunc() RequestUnmarshaler { + return func(bytes []byte) (Request, error) { + var r fakeRequest + if err := json.Unmarshal(bytes, &r); err != nil { + return nil, err + } + r.exportCallback = c.exportCallback + return r, nil + } } diff --git a/exporter/exporterhelper/traces_test.go b/exporter/exporterhelper/traces_test.go index 3774a6015db..e837b202da8 100644 --- a/exporter/exporterhelper/traces_test.go +++ b/exporter/exporterhelper/traces_test.go @@ -6,6 +6,7 @@ package exporterhelper import ( "context" "errors" + "sync/atomic" "testing" "time" @@ -142,11 +143,12 @@ func TestTracesRequestExporter_Default_ConvertError(t *testing.T) { func TestTracesRequestExporter_Default_ExportError(t *testing.T) { td := ptrace.NewTraces() - want := errors.New("export_error") - te, err := NewTracesRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{requestError: want}) + wantErr := errors.New("export_error") + te, err := NewTracesRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), + &fakeRequestConverter{exportCallback: func(req Request) error { return wantErr }}) require.NoError(t, err) require.NotNil(t, te) - require.Equal(t, want, te.ConsumeTraces(context.Background(), td)) + require.Equal(t, wantErr, te.ConsumeTraces(context.Background(), td)) } func TestTracesExporter_WithPersistentQueue(t *testing.T) { @@ -173,6 +175,34 @@ func TestTracesExporter_WithPersistentQueue(t *testing.T) { }, 500*time.Millisecond, 10*time.Millisecond) } +func TestTracesRequestExporter_WithPersistentQueue(t *testing.T) { + qCfg := NewDefaultPersistentQueueConfig() + storageID := component.NewIDWithName("file_storage", "storage") + qCfg.StorageID = &storageID + acc := &atomic.Uint32{} + set := exportertest.NewNopCreateSettings() + set.ID = component.NewIDWithName("test_traces_request", "with_persistent_queue") + rc := &fakeRequestConverter{exportCallback: func(req Request) error { + acc.Add(uint32(req.(RequestItemsCounter).ItemsCount())) + return nil + }} + te, err := NewTracesRequestExporter(context.Background(), set, rc, + WithPersistentQueue(qCfg, rc.requestMarshalerFunc(), rc.requestUnmarshalerFunc())) + require.NoError(t, err) + + host := &mockHost{ext: map[component.ID]component.Component{ + storageID: internal.NewMockStorageExtension(), + }} + require.NoError(t, te.Start(context.Background(), host)) + t.Cleanup(func() { require.NoError(t, te.Shutdown(context.Background())) }) + + require.NoError(t, te.ConsumeTraces(context.Background(), testdata.GenerateTraces(1))) + require.NoError(t, te.ConsumeTraces(context.Background(), testdata.GenerateTraces(2))) + require.Eventually(t, func() bool { + return acc.Load() == 3 + }, 500*time.Millisecond, 10*time.Millisecond) +} + func TestTracesExporter_WithRecordMetrics(t *testing.T) { tt, err := obsreporttest.SetupTelemetry(fakeTracesExporterName) require.NoError(t, err) @@ -211,16 +241,17 @@ func TestTracesExporter_WithRecordMetrics_ReturnError(t *testing.T) { } func TestTracesRequestExporter_WithRecordMetrics_RequestSenderError(t *testing.T) { - want := errors.New("export_error") + wantErr := errors.New("export_error") tt, err := obsreporttest.SetupTelemetry(fakeTracesExporterName) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) - te, err := NewTracesRequestExporter(context.Background(), tt.ToExporterCreateSettings(), &fakeRequestConverter{requestError: want}) + te, err := NewTracesRequestExporter(context.Background(), tt.ToExporterCreateSettings(), + &fakeRequestConverter{exportCallback: func(req Request) error { return wantErr }}) require.NoError(t, err) require.NotNil(t, te) - checkRecordedMetricsForTracesExporter(t, tt, te, want) + checkRecordedMetricsForTracesExporter(t, tt, te, wantErr) } func TestTracesExporter_WithRecordEnqueueFailedMetrics(t *testing.T) { @@ -298,12 +329,13 @@ func TestTracesRequestExporter_WithSpan_ExportError(t *testing.T) { otel.SetTracerProvider(set.TracerProvider) defer otel.SetTracerProvider(trace.NewNoopTracerProvider()) - want := errors.New("export_error") - te, err := NewTracesRequestExporter(context.Background(), set, &fakeRequestConverter{requestError: want}) + wantErr := errors.New("export_error") + te, err := NewTracesRequestExporter(context.Background(), set, + &fakeRequestConverter{exportCallback: func(req Request) error { return wantErr }}) require.NoError(t, err) require.NotNil(t, te) - checkWrapSpanForTracesExporter(t, sr, set.TracerProvider.Tracer("test"), te, want, 1) + checkWrapSpanForTracesExporter(t, sr, set.TracerProvider.Tracer("test"), te, wantErr, 1) } func TestTracesExporter_WithShutdown(t *testing.T) {