Skip to content

Commit

Permalink
support WithQueue -> WithQueueConfig
Browse files Browse the repository at this point in the history
  • Loading branch information
atoulme committed Jan 25, 2024
1 parent 4116477 commit 513e1aa
Show file tree
Hide file tree
Showing 9 changed files with 40 additions and 32 deletions.
12 changes: 10 additions & 2 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,18 @@ func WithRetry(config configretry.BackOffConfig) Option {
}
}

// WithQueue overrides the default QueueConfig for an exporter.
// WithQueue overrides the default QueueSettings for an exporter.
// The default QueueSettings is to disable queueing.
// This option cannot be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
// Deprecated: [v0.94.0] Use WithQueueConfig
func WithQueue(config QueueSettings) Option {
return WithQueueConfig(QueueConfig(config))

Check warning on line 87 in exporter/exporterhelper/common.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterhelper/common.go#L87

Added line #L87 was not covered by tests
}

// WithQueueConfig overrides the default QueueConfig for an exporter.
// The default QueueConfig is to disable queueing.
// This option cannot be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
func WithQueue(config QueueConfig) Option {
func WithQueueConfig(config QueueConfig) Option {
return func(o *baseExporter) {
if o.requestExporter {
panic("queueing is not available for the new request exporters yet")
Expand Down
2 changes: 1 addition & 1 deletion exporter/exporterhelper/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestQueueRetryOptionsWithRequestExporter(t *testing.T) {
require.True(t, bs.requestExporter)
require.Panics(t, func() {
_, _ = newBaseExporter(exportertest.NewNopCreateSettings(), "", true, nil, nil, newNoopObsrepSender,
WithRetry(configretry.NewDefaultBackOffConfig()), WithQueue(NewDefaultQueueConfig()))
WithRetry(configretry.NewDefaultBackOffConfig()), WithQueueConfig(NewDefaultQueueConfig()))
})
}

Expand Down
4 changes: 2 additions & 2 deletions exporter/exporterhelper/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func TestLogsExporter_WithPersistentQueue(t *testing.T) {
ts := consumertest.LogsSink{}
set := exportertest.NewNopCreateSettings()
set.ID = component.NewIDWithName("test_logs", "with_persistent_queue")
te, err := NewLogsExporter(context.Background(), set, &fakeLogsExporterConfig, ts.ConsumeLogs, WithRetry(rCfg), WithQueue(qCfg))
te, err := NewLogsExporter(context.Background(), set, &fakeLogsExporterConfig, ts.ConsumeLogs, WithRetry(rCfg), WithQueueConfig(qCfg))
require.NoError(t, err)

host := &mockHost{ext: map[component.ID]component.Component{
Expand Down Expand Up @@ -242,7 +242,7 @@ func TestLogsExporter_WithRecordEnqueueFailedMetrics(t *testing.T) {
qCfg.NumConsumers = 1
qCfg.QueueSize = 2
wantErr := errors.New("some-error")
te, err := NewLogsExporter(context.Background(), exporter.CreateSettings{ID: fakeLogsExporterName, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, &fakeLogsExporterConfig, newPushLogsData(wantErr), WithRetry(rCfg), WithQueue(qCfg))
te, err := NewLogsExporter(context.Background(), exporter.CreateSettings{ID: fakeLogsExporterName, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, &fakeLogsExporterConfig, newPushLogsData(wantErr), WithRetry(rCfg), WithQueueConfig(qCfg))
require.NoError(t, err)
require.NotNil(t, te)

Expand Down
4 changes: 2 additions & 2 deletions exporter/exporterhelper/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func TestMetricsExporter_WithPersistentQueue(t *testing.T) {
ms := consumertest.MetricsSink{}
set := exportertest.NewNopCreateSettings()
set.ID = component.NewIDWithName("test_metrics", "with_persistent_queue")
te, err := NewMetricsExporter(context.Background(), set, &fakeTracesExporterConfig, ms.ConsumeMetrics, WithRetry(rCfg), WithQueue(qCfg))
te, err := NewMetricsExporter(context.Background(), set, &fakeTracesExporterConfig, ms.ConsumeMetrics, WithRetry(rCfg), WithQueueConfig(qCfg))
require.NoError(t, err)

host := &mockHost{ext: map[component.ID]component.Component{
Expand Down Expand Up @@ -244,7 +244,7 @@ func TestMetricsExporter_WithRecordEnqueueFailedMetrics(t *testing.T) {
qCfg.NumConsumers = 1
qCfg.QueueSize = 2
wantErr := errors.New("some-error")
te, err := NewMetricsExporter(context.Background(), exporter.CreateSettings{ID: fakeMetricsExporterName, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, &fakeMetricsExporterConfig, newPushMetricsData(wantErr), WithRetry(rCfg), WithQueue(qCfg))
te, err := NewMetricsExporter(context.Background(), exporter.CreateSettings{ID: fakeMetricsExporterName, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, &fakeMetricsExporterConfig, newPushMetricsData(wantErr), WithRetry(rCfg), WithQueueConfig(qCfg))
require.NoError(t, err)
require.NotNil(t, te)

Expand Down
22 changes: 11 additions & 11 deletions exporter/exporterhelper/queue_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestQueuedRetry_StopWhileWaiting(t *testing.T) {
qCfg := NewDefaultQueueConfig()
qCfg.NumConsumers = 1
rCfg := configretry.NewDefaultBackOffConfig()
be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg))
be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueueConfig(qCfg))
require.NoError(t, err)
ocs := be.obsrepSender.(*observabilityConsumerSender)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
Expand Down Expand Up @@ -58,7 +58,7 @@ func TestQueuedRetry_DoNotPreserveCancellation(t *testing.T) {
qCfg := NewDefaultQueueConfig()
qCfg.NumConsumers = 1
rCfg := configretry.NewDefaultBackOffConfig()
be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg))
be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueueConfig(qCfg))
require.NoError(t, err)
ocs := be.obsrepSender.(*observabilityConsumerSender)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
Expand Down Expand Up @@ -88,7 +88,7 @@ func TestQueuedRetry_RejectOnFull(t *testing.T) {
set := exportertest.NewNopCreateSettings()
logger, observed := observer.New(zap.ErrorLevel)
set.Logger = zap.New(logger)
be, err := newBaseExporter(set, "", false, nil, nil, newNoopObsrepSender, WithQueue(qCfg))
be, err := newBaseExporter(set, "", false, nil, nil, newNoopObsrepSender, WithQueueConfig(qCfg))
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() {
Expand All @@ -108,7 +108,7 @@ func TestQueuedRetryHappyPath(t *testing.T) {
qCfg := NewDefaultQueueConfig()
rCfg := configretry.NewDefaultBackOffConfig()
set := exporter.CreateSettings{ID: defaultID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}
be, err := newBaseExporter(set, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg))
be, err := newBaseExporter(set, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueueConfig(qCfg))
require.NoError(t, err)
ocs := be.obsrepSender.(*observabilityConsumerSender)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
Expand Down Expand Up @@ -145,7 +145,7 @@ func TestQueuedRetry_QueueMetricsReported(t *testing.T) {
qCfg.NumConsumers = 0 // to make every request go straight to the queue
rCfg := configretry.NewDefaultBackOffConfig()
set := exporter.CreateSettings{ID: defaultID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}
be, err := newBaseExporter(set, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg))
be, err := newBaseExporter(set, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueueConfig(qCfg))
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))

Expand Down Expand Up @@ -199,7 +199,7 @@ func TestQueueRetryWithDisabledQueue(t *testing.T) {
logger, observed := observer.New(zap.ErrorLevel)
set.Logger = zap.New(logger)
be, err := newBaseExporter(set, component.DataTypeLogs, false, nil, nil, newObservabilityConsumerSender,
WithQueue(qs))
WithQueueConfig(qs))
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
ocs := be.obsrepSender.(*observabilityConsumerSender)
Expand All @@ -220,7 +220,7 @@ func TestQueueFailedRequestDropped(t *testing.T) {
set := exportertest.NewNopCreateSettings()
logger, observed := observer.New(zap.ErrorLevel)
set.Logger = zap.New(logger)
be, err := newBaseExporter(set, component.DataTypeLogs, false, nil, nil, newNoopObsrepSender, WithQueue(NewDefaultQueueConfig()))
be, err := newBaseExporter(set, component.DataTypeLogs, false, nil, nil, newNoopObsrepSender, WithQueueConfig(NewDefaultQueueConfig()))
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
mockR := newMockRequest(2, errors.New("some error"))
Expand All @@ -241,7 +241,7 @@ func TestQueuedRetryPersistenceEnabled(t *testing.T) {
qCfg.StorageID = &storageID // enable persistence
rCfg := configretry.NewDefaultBackOffConfig()
set := exporter.CreateSettings{ID: defaultID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}
be, err := newBaseExporter(set, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg))
be, err := newBaseExporter(set, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueueConfig(qCfg))
require.NoError(t, err)

var extensions = map[component.ID]component.Component{
Expand All @@ -265,7 +265,7 @@ func TestQueuedRetryPersistenceEnabledStorageError(t *testing.T) {
qCfg.StorageID = &storageID // enable persistence
rCfg := configretry.NewDefaultBackOffConfig()
set := exporter.CreateSettings{ID: defaultID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}
be, err := newBaseExporter(set, "", false, mockRequestMarshaler, mockRequestUnmarshaler(&mockRequest{}), newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg))
be, err := newBaseExporter(set, "", false, mockRequestMarshaler, mockRequestUnmarshaler(&mockRequest{}), newObservabilityConsumerSender, WithRetry(rCfg), WithQueueConfig(qCfg))
require.NoError(t, err)

var extensions = map[component.ID]component.Component{
Expand All @@ -289,7 +289,7 @@ func TestQueuedRetryPersistentEnabled_NoDataLossOnShutdown(t *testing.T) {

mockReq := newErrorRequest()
be, err := newBaseExporter(defaultSettings, "", false, mockRequestMarshaler, mockRequestUnmarshaler(mockReq),
newNoopObsrepSender, WithRetry(rCfg), WithQueue(qCfg))
newNoopObsrepSender, WithRetry(rCfg), WithQueueConfig(qCfg))
require.NoError(t, err)

var extensions = map[component.ID]component.Component{
Expand All @@ -313,7 +313,7 @@ func TestQueuedRetryPersistentEnabled_NoDataLossOnShutdown(t *testing.T) {
// start the exporter again replacing the preserved mockRequest in the unmarshaler with a new one that doesn't fail.
replacedReq := newMockRequest(1, nil)
be, err = newBaseExporter(defaultSettings, "", false, mockRequestMarshaler, mockRequestUnmarshaler(replacedReq),
newNoopObsrepSender, WithRetry(rCfg), WithQueue(qCfg))
newNoopObsrepSender, WithRetry(rCfg), WithQueueConfig(qCfg))
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), host))
t.Cleanup(func() { require.NoError(t, be.Shutdown(context.Background())) })
Expand Down
12 changes: 6 additions & 6 deletions exporter/exporterhelper/retry_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestQueuedRetry_DropOnPermanentError(t *testing.T) {
qCfg := NewDefaultQueueConfig()
rCfg := configretry.NewDefaultBackOffConfig()
mockR := newMockRequest(2, consumererror.NewPermanent(errors.New("bad data")))
be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg))
be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueueConfig(qCfg))
require.NoError(t, err)
ocs := be.obsrepSender.(*observabilityConsumerSender)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
Expand All @@ -63,7 +63,7 @@ func TestQueuedRetry_DropOnNoRetry(t *testing.T) {
rCfg.Enabled = false
be, err := newBaseExporter(defaultSettings, "", false, mockRequestMarshaler,
mockRequestUnmarshaler(newMockRequest(2, errors.New("transient error"))),
newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg))
newObservabilityConsumerSender, WithRetry(rCfg), WithQueueConfig(qCfg))
require.NoError(t, err)
ocs := be.obsrepSender.(*observabilityConsumerSender)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
Expand All @@ -88,7 +88,7 @@ func TestQueuedRetry_OnError(t *testing.T) {
qCfg.NumConsumers = 1
rCfg := configretry.NewDefaultBackOffConfig()
rCfg.InitialInterval = 0
be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg))
be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueueConfig(qCfg))
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() {
Expand Down Expand Up @@ -116,7 +116,7 @@ func TestQueuedRetry_MaxElapsedTime(t *testing.T) {
rCfg := configretry.NewDefaultBackOffConfig()
rCfg.InitialInterval = time.Millisecond
rCfg.MaxElapsedTime = 100 * time.Millisecond
be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg))
be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueueConfig(qCfg))
require.NoError(t, err)
ocs := be.obsrepSender.(*observabilityConsumerSender)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
Expand Down Expand Up @@ -162,7 +162,7 @@ func TestQueuedRetry_ThrottleError(t *testing.T) {
qCfg.NumConsumers = 1
rCfg := configretry.NewDefaultBackOffConfig()
rCfg.InitialInterval = 10 * time.Millisecond
be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg))
be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueueConfig(qCfg))
require.NoError(t, err)
ocs := be.obsrepSender.(*observabilityConsumerSender)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
Expand Down Expand Up @@ -194,7 +194,7 @@ func TestQueuedRetry_RetryOnError(t *testing.T) {
qCfg.QueueSize = 1
rCfg := configretry.NewDefaultBackOffConfig()
rCfg.InitialInterval = 0
be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg))
be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueueConfig(qCfg))
require.NoError(t, err)
ocs := be.obsrepSender.(*observabilityConsumerSender)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
Expand Down
4 changes: 2 additions & 2 deletions exporter/exporterhelper/traces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func TestTracesExporter_WithPersistentQueue(t *testing.T) {
ts := consumertest.TracesSink{}
set := exportertest.NewNopCreateSettings()
set.ID = component.NewIDWithName("test_traces", "with_persistent_queue")
te, err := NewTracesExporter(context.Background(), set, &fakeTracesExporterConfig, ts.ConsumeTraces, WithRetry(rCfg), WithQueue(qCfg))
te, err := NewTracesExporter(context.Background(), set, &fakeTracesExporterConfig, ts.ConsumeTraces, WithRetry(rCfg), WithQueueConfig(qCfg))
require.NoError(t, err)

host := &mockHost{ext: map[component.ID]component.Component{
Expand Down Expand Up @@ -241,7 +241,7 @@ func TestTracesExporter_WithRecordEnqueueFailedMetrics(t *testing.T) {
qCfg.NumConsumers = 1
qCfg.QueueSize = 2
wantErr := errors.New("some-error")
te, err := NewTracesExporter(context.Background(), exporter.CreateSettings{ID: fakeTracesExporterName, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, &fakeTracesExporterConfig, newTraceDataPusher(wantErr), WithRetry(rCfg), WithQueue(qCfg))
te, err := NewTracesExporter(context.Background(), exporter.CreateSettings{ID: fakeTracesExporterName, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, &fakeTracesExporterConfig, newTraceDataPusher(wantErr), WithRetry(rCfg), WithQueueConfig(qCfg))
require.NoError(t, err)
require.NotNil(t, te)

Expand Down
6 changes: 3 additions & 3 deletions exporter/otlpexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func createTracesExporter(
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
exporterhelper.WithTimeout(oCfg.TimeoutSettings),
exporterhelper.WithRetry(oCfg.RetryConfig),
exporterhelper.WithQueue(oCfg.QueueConfig),
exporterhelper.WithQueueConfig(oCfg.QueueConfig),
exporterhelper.WithStart(oce.start),
exporterhelper.WithShutdown(oce.shutdown))
}
Expand All @@ -78,7 +78,7 @@ func createMetricsExporter(
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
exporterhelper.WithTimeout(oCfg.TimeoutSettings),
exporterhelper.WithRetry(oCfg.RetryConfig),
exporterhelper.WithQueue(oCfg.QueueConfig),
exporterhelper.WithQueueConfig(oCfg.QueueConfig),
exporterhelper.WithStart(oce.start),
exporterhelper.WithShutdown(oce.shutdown),
)
Expand All @@ -99,7 +99,7 @@ func createLogsExporter(
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
exporterhelper.WithTimeout(oCfg.TimeoutSettings),
exporterhelper.WithRetry(oCfg.RetryConfig),
exporterhelper.WithQueue(oCfg.QueueConfig),
exporterhelper.WithQueueConfig(oCfg.QueueConfig),
exporterhelper.WithStart(oce.start),
exporterhelper.WithShutdown(oce.shutdown),
)
Expand Down
6 changes: 3 additions & 3 deletions exporter/otlphttpexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func createTracesExporter(
// explicitly disable since we rely on http.Client timeout logic.
exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}),
exporterhelper.WithRetry(oCfg.RetryConfig),
exporterhelper.WithQueue(oCfg.QueueConfig))
exporterhelper.WithQueueConfig(oCfg.QueueConfig))
}

func createMetricsExporter(
Expand All @@ -115,7 +115,7 @@ func createMetricsExporter(
// explicitly disable since we rely on http.Client timeout logic.
exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}),
exporterhelper.WithRetry(oCfg.RetryConfig),
exporterhelper.WithQueue(oCfg.QueueConfig))
exporterhelper.WithQueueConfig(oCfg.QueueConfig))
}

func createLogsExporter(
Expand All @@ -141,5 +141,5 @@ func createLogsExporter(
// explicitly disable since we rely on http.Client timeout logic.
exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}),
exporterhelper.WithRetry(oCfg.RetryConfig),
exporterhelper.WithQueue(oCfg.QueueConfig))
exporterhelper.WithQueueConfig(oCfg.QueueConfig))
}

0 comments on commit 513e1aa

Please sign in to comment.