Skip to content

Commit

Permalink
[exporterhelper] Deprecate QueueSettings, use QueueConfig instead
Browse files Browse the repository at this point in the history
  • Loading branch information
atoulme committed Jan 25, 2024
1 parent 6b3b181 commit 0830409
Show file tree
Hide file tree
Showing 19 changed files with 86 additions and 47 deletions.
25 changes: 25 additions & 0 deletions .chloggen/queuesettings_queueconfig.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: deprecation

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Deprecate QueueSettings, use QueueConfig instead

# One or more tracking issues or pull requests related to the change
issues: [6767]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
6 changes: 3 additions & 3 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,10 @@ func WithRetry(config configretry.BackOffConfig) Option {
}
}

// WithQueue overrides the default QueueSettings for an exporter.
// The default QueueSettings is to disable queueing.
// WithQueue overrides the default QueueConfig for an exporter.
// The default QueueConfig is to disable queueing.
// This option cannot be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
func WithQueue(config QueueSettings) Option {
func WithQueue(config QueueConfig) Option {
return func(o *baseExporter) {
if o.requestExporter {
panic("queueing is not available for the new request exporters yet")
Expand Down
2 changes: 1 addition & 1 deletion exporter/exporterhelper/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestQueueRetryOptionsWithRequestExporter(t *testing.T) {
require.True(t, bs.requestExporter)
require.Panics(t, func() {
_, _ = newBaseExporter(exportertest.NewNopCreateSettings(), "", true, nil, nil, newNoopObsrepSender,
WithRetry(configretry.NewDefaultBackOffConfig()), WithQueue(NewDefaultQueueSettings()))
WithRetry(configretry.NewDefaultBackOffConfig()), WithQueue(NewDefaultQueueConfig()))
})
}

Expand Down
4 changes: 2 additions & 2 deletions exporter/exporterhelper/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func TestLogsRequestExporter_Default_ExportError(t *testing.T) {
}

func TestLogsExporter_WithPersistentQueue(t *testing.T) {
qCfg := NewDefaultQueueSettings()
qCfg := NewDefaultQueueConfig()
storageID := component.NewIDWithName("file_storage", "storage")
qCfg.StorageID = &storageID
rCfg := configretry.NewDefaultBackOffConfig()
Expand Down Expand Up @@ -238,7 +238,7 @@ func TestLogsExporter_WithRecordEnqueueFailedMetrics(t *testing.T) {
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

rCfg := configretry.NewDefaultBackOffConfig()
qCfg := NewDefaultQueueSettings()
qCfg := NewDefaultQueueConfig()
qCfg.NumConsumers = 1
qCfg.QueueSize = 2
wantErr := errors.New("some-error")
Expand Down
4 changes: 2 additions & 2 deletions exporter/exporterhelper/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func TestMetricsRequestExporter_Default_ExportError(t *testing.T) {
}

func TestMetricsExporter_WithPersistentQueue(t *testing.T) {
qCfg := NewDefaultQueueSettings()
qCfg := NewDefaultQueueConfig()
storageID := component.NewIDWithName("file_storage", "storage")
qCfg.StorageID = &storageID
rCfg := configretry.NewDefaultBackOffConfig()
Expand Down Expand Up @@ -240,7 +240,7 @@ func TestMetricsExporter_WithRecordEnqueueFailedMetrics(t *testing.T) {
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

rCfg := configretry.NewDefaultBackOffConfig()
qCfg := NewDefaultQueueSettings()
qCfg := NewDefaultQueueConfig()
qCfg.NumConsumers = 1
qCfg.QueueSize = 2
wantErr := errors.New("some-error")
Expand Down
22 changes: 18 additions & 4 deletions exporter/exporterhelper/queue_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,13 @@ var (
)

// QueueSettings defines configuration for queueing batches before sending to the consumerSender.
// Deprecated: [v0.94.0] Use QueueConfig instead
type QueueSettings struct {
QueueConfig
}

// QueueConfig defines configuration for queueing batches before sending to the consumerSender.
type QueueConfig struct {
// Enabled indicates whether to not enqueue batches before sending to the consumerSender.
Enabled bool `mapstructure:"enabled"`
// NumConsumers is the number of consumers from the queue.
Expand All @@ -40,9 +46,17 @@ type QueueSettings struct {
StorageID *component.ID `mapstructure:"storage"`
}

// NewDefaultQueueSettings returns the default settings for QueueSettings.
// NewDefaultQueueSettings returns the default settings for QueueConfig.
// Deprecated: [v0.94.0] Use NewDefaultQueueConfig instead
func NewDefaultQueueSettings() QueueSettings {
return QueueSettings{
NewDefaultQueueConfig(),
}

Check warning on line 54 in exporter/exporterhelper/queue_sender.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterhelper/queue_sender.go#L53-L54

Added lines #L53 - L54 were not covered by tests
}

// NewDefaultQueueConfig returns the default settings for QueueConfig.
func NewDefaultQueueConfig() QueueConfig {
return QueueConfig{
Enabled: true,
NumConsumers: 10,
// By default, batches are 8192 spans, for a total of up to 8 million spans in the queue
Expand All @@ -52,8 +66,8 @@ func NewDefaultQueueSettings() QueueSettings {
}
}

// Validate checks if the QueueSettings configuration is valid
func (qCfg *QueueSettings) Validate() error {
// Validate checks if the QueueConfig configuration is valid
func (qCfg *QueueConfig) Validate() error {
if !qCfg.Enabled {
return nil
}
Expand Down Expand Up @@ -82,7 +96,7 @@ type queueSender struct {
metricSize otelmetric.Int64ObservableGauge
}

func newQueueSender(config QueueSettings, set exporter.CreateSettings, signal component.DataType,
func newQueueSender(config QueueConfig, set exporter.CreateSettings, signal component.DataType,
marshaler RequestMarshaler, unmarshaler RequestUnmarshaler, consumeErrHandler func(error, Request)) *queueSender {

isPersistent := config.StorageID != nil
Expand Down
26 changes: 13 additions & 13 deletions exporter/exporterhelper/queue_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
)

func TestQueuedRetry_StopWhileWaiting(t *testing.T) {
qCfg := NewDefaultQueueSettings()
qCfg := NewDefaultQueueConfig()
qCfg.NumConsumers = 1
rCfg := configretry.NewDefaultBackOffConfig()
be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg))
Expand Down Expand Up @@ -55,7 +55,7 @@ func TestQueuedRetry_StopWhileWaiting(t *testing.T) {
}

func TestQueuedRetry_DoNotPreserveCancellation(t *testing.T) {
qCfg := NewDefaultQueueSettings()
qCfg := NewDefaultQueueConfig()
qCfg.NumConsumers = 1
rCfg := configretry.NewDefaultBackOffConfig()
be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg))
Expand All @@ -82,7 +82,7 @@ func TestQueuedRetry_DoNotPreserveCancellation(t *testing.T) {
}

func TestQueuedRetry_RejectOnFull(t *testing.T) {
qCfg := NewDefaultQueueSettings()
qCfg := NewDefaultQueueConfig()
qCfg.QueueSize = 0
qCfg.NumConsumers = 0
set := exportertest.NewNopCreateSettings()
Expand All @@ -105,7 +105,7 @@ func TestQueuedRetryHappyPath(t *testing.T) {
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

qCfg := NewDefaultQueueSettings()
qCfg := NewDefaultQueueConfig()
rCfg := configretry.NewDefaultBackOffConfig()
set := exporter.CreateSettings{ID: defaultID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}
be, err := newBaseExporter(set, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg))
Expand Down Expand Up @@ -141,7 +141,7 @@ func TestQueuedRetry_QueueMetricsReported(t *testing.T) {
tt, err := componenttest.SetupTelemetry(defaultID)
require.NoError(t, err)

qCfg := NewDefaultQueueSettings()
qCfg := NewDefaultQueueConfig()
qCfg.NumConsumers = 0 // to make every request go straight to the queue
rCfg := configretry.NewDefaultBackOffConfig()
set := exporter.CreateSettings{ID: defaultID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}
Expand Down Expand Up @@ -176,13 +176,13 @@ func TestNoCancellationContext(t *testing.T) {
}

func TestQueueSettings_Validate(t *testing.T) {
qCfg := NewDefaultQueueSettings()
qCfg := NewDefaultQueueConfig()
assert.NoError(t, qCfg.Validate())

qCfg.QueueSize = 0
assert.EqualError(t, qCfg.Validate(), "queue size must be positive")

qCfg = NewDefaultQueueSettings()
qCfg = NewDefaultQueueConfig()
qCfg.NumConsumers = 0

assert.EqualError(t, qCfg.Validate(), "number of queue consumers must be positive")
Expand All @@ -193,7 +193,7 @@ func TestQueueSettings_Validate(t *testing.T) {
}

func TestQueueRetryWithDisabledQueue(t *testing.T) {
qs := NewDefaultQueueSettings()
qs := NewDefaultQueueConfig()
qs.Enabled = false
set := exportertest.NewNopCreateSettings()
logger, observed := observer.New(zap.ErrorLevel)
Expand All @@ -220,7 +220,7 @@ func TestQueueFailedRequestDropped(t *testing.T) {
set := exportertest.NewNopCreateSettings()
logger, observed := observer.New(zap.ErrorLevel)
set.Logger = zap.New(logger)
be, err := newBaseExporter(set, component.DataTypeLogs, false, nil, nil, newNoopObsrepSender, WithQueue(NewDefaultQueueSettings()))
be, err := newBaseExporter(set, component.DataTypeLogs, false, nil, nil, newNoopObsrepSender, WithQueue(NewDefaultQueueConfig()))
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
mockR := newMockRequest(2, errors.New("some error"))
Expand All @@ -236,7 +236,7 @@ func TestQueuedRetryPersistenceEnabled(t *testing.T) {
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

qCfg := NewDefaultQueueSettings()
qCfg := NewDefaultQueueConfig()
storageID := component.NewIDWithName("file_storage", "storage")
qCfg.StorageID = &storageID // enable persistence
rCfg := configretry.NewDefaultBackOffConfig()
Expand All @@ -260,7 +260,7 @@ func TestQueuedRetryPersistenceEnabledStorageError(t *testing.T) {
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

qCfg := NewDefaultQueueSettings()
qCfg := NewDefaultQueueConfig()
storageID := component.NewIDWithName("file_storage", "storage")
qCfg.StorageID = &storageID // enable persistence
rCfg := configretry.NewDefaultBackOffConfig()
Expand All @@ -278,7 +278,7 @@ func TestQueuedRetryPersistenceEnabledStorageError(t *testing.T) {
}

func TestQueuedRetryPersistentEnabled_NoDataLossOnShutdown(t *testing.T) {
qCfg := NewDefaultQueueSettings()
qCfg := NewDefaultQueueConfig()
qCfg.NumConsumers = 1
storageID := component.NewIDWithName("file_storage", "storage")
qCfg.StorageID = &storageID // enable persistence to ensure data is re-queued on shutdown
Expand Down Expand Up @@ -323,7 +323,7 @@ func TestQueuedRetryPersistentEnabled_NoDataLossOnShutdown(t *testing.T) {
}

func TestQueueSenderNoStartShutdown(t *testing.T) {
qs := newQueueSender(NewDefaultQueueSettings(), exportertest.NewNopCreateSettings(), "", nil, nil, nil)
qs := newQueueSender(NewDefaultQueueConfig(), exportertest.NewNopCreateSettings(), "", nil, nil, nil)
assert.NoError(t, qs.Shutdown(context.Background()))
}

Expand Down
12 changes: 6 additions & 6 deletions exporter/exporterhelper/retry_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func mockRequestMarshaler(_ Request) ([]byte, error) {
}

func TestQueuedRetry_DropOnPermanentError(t *testing.T) {
qCfg := NewDefaultQueueSettings()
qCfg := NewDefaultQueueConfig()
rCfg := configretry.NewDefaultBackOffConfig()
mockR := newMockRequest(2, consumererror.NewPermanent(errors.New("bad data")))
be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg))
Expand All @@ -58,7 +58,7 @@ func TestQueuedRetry_DropOnPermanentError(t *testing.T) {
}

func TestQueuedRetry_DropOnNoRetry(t *testing.T) {
qCfg := NewDefaultQueueSettings()
qCfg := NewDefaultQueueConfig()
rCfg := configretry.NewDefaultBackOffConfig()
rCfg.Enabled = false
be, err := newBaseExporter(defaultSettings, "", false, mockRequestMarshaler,
Expand All @@ -84,7 +84,7 @@ func TestQueuedRetry_DropOnNoRetry(t *testing.T) {
}

func TestQueuedRetry_OnError(t *testing.T) {
qCfg := NewDefaultQueueSettings()
qCfg := NewDefaultQueueConfig()
qCfg.NumConsumers = 1
rCfg := configretry.NewDefaultBackOffConfig()
rCfg.InitialInterval = 0
Expand All @@ -111,7 +111,7 @@ func TestQueuedRetry_OnError(t *testing.T) {
}

func TestQueuedRetry_MaxElapsedTime(t *testing.T) {
qCfg := NewDefaultQueueSettings()
qCfg := NewDefaultQueueConfig()
qCfg.NumConsumers = 1
rCfg := configretry.NewDefaultBackOffConfig()
rCfg.InitialInterval = time.Millisecond
Expand Down Expand Up @@ -158,7 +158,7 @@ func (e wrappedError) Unwrap() error {
}

func TestQueuedRetry_ThrottleError(t *testing.T) {
qCfg := NewDefaultQueueSettings()
qCfg := NewDefaultQueueConfig()
qCfg.NumConsumers = 1
rCfg := configretry.NewDefaultBackOffConfig()
rCfg.InitialInterval = 10 * time.Millisecond
Expand Down Expand Up @@ -189,7 +189,7 @@ func TestQueuedRetry_ThrottleError(t *testing.T) {
}

func TestQueuedRetry_RetryOnError(t *testing.T) {
qCfg := NewDefaultQueueSettings()
qCfg := NewDefaultQueueConfig()
qCfg.NumConsumers = 1
qCfg.QueueSize = 1
rCfg := configretry.NewDefaultBackOffConfig()
Expand Down
4 changes: 2 additions & 2 deletions exporter/exporterhelper/traces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func TestTracesRequestExporter_Default_ExportError(t *testing.T) {
}

func TestTracesExporter_WithPersistentQueue(t *testing.T) {
qCfg := NewDefaultQueueSettings()
qCfg := NewDefaultQueueConfig()
storageID := component.NewIDWithName("file_storage", "storage")
qCfg.StorageID = &storageID
rCfg := configretry.NewDefaultBackOffConfig()
Expand Down Expand Up @@ -237,7 +237,7 @@ func TestTracesExporter_WithRecordEnqueueFailedMetrics(t *testing.T) {
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

rCfg := configretry.NewDefaultBackOffConfig()
qCfg := NewDefaultQueueSettings()
qCfg := NewDefaultQueueConfig()
qCfg.NumConsumers = 1
qCfg.QueueSize = 2
wantErr := errors.New("some-error")
Expand Down
2 changes: 1 addition & 1 deletion exporter/otlpexporter/cfg-schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ fields:
doc: |
Timeout is the timeout for every attempt to send data to the backend.
- name: sending_queue
type: exporterhelper.QueueSettings
type: exporterhelper.QueueConfig
kind: struct
fields:
- name: enabled
Expand Down
6 changes: 3 additions & 3 deletions exporter/otlpexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ import (

// Config defines configuration for OTLP exporter.
type Config struct {
exporterhelper.TimeoutSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
QueueConfig exporterhelper.QueueSettings `mapstructure:"sending_queue"`
RetryConfig configretry.BackOffConfig `mapstructure:"retry_on_failure"`
exporterhelper.TimeoutSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
QueueConfig exporterhelper.QueueConfig `mapstructure:"sending_queue"`
RetryConfig configretry.BackOffConfig `mapstructure:"retry_on_failure"`

configgrpc.GRPCClientSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
}
Expand Down
2 changes: 1 addition & 1 deletion exporter/otlpexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestUnmarshalConfig(t *testing.T) {
MaxInterval: 1 * time.Minute,
MaxElapsedTime: 10 * time.Minute,
},
QueueConfig: exporterhelper.QueueSettings{
QueueConfig: exporterhelper.QueueConfig{
Enabled: true,
NumConsumers: 2,
QueueSize: 10,
Expand Down
2 changes: 1 addition & 1 deletion exporter/otlpexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func createDefaultConfig() component.Config {
return &Config{
TimeoutSettings: exporterhelper.NewDefaultTimeoutSettings(),
RetryConfig: configretry.NewDefaultBackOffConfig(),
QueueConfig: exporterhelper.NewDefaultQueueSettings(),
QueueConfig: exporterhelper.NewDefaultQueueConfig(),
GRPCClientSettings: configgrpc.GRPCClientSettings{
Headers: map[string]configopaque.String{},
// Default to gzip compression
Expand Down
2 changes: 1 addition & 1 deletion exporter/otlpexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestCreateDefaultConfig(t *testing.T) {
ocfg, ok := factory.CreateDefaultConfig().(*Config)
assert.True(t, ok)
assert.Equal(t, ocfg.RetryConfig, configretry.NewDefaultBackOffConfig())
assert.Equal(t, ocfg.QueueConfig, exporterhelper.NewDefaultQueueSettings())
assert.Equal(t, ocfg.QueueConfig, exporterhelper.NewDefaultQueueConfig())
assert.Equal(t, ocfg.TimeoutSettings, exporterhelper.NewDefaultTimeoutSettings())
assert.Equal(t, ocfg.Compression, configcompression.Gzip)
}
Expand Down
6 changes: 3 additions & 3 deletions exporter/otlphttpexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ import (

// Config defines configuration for OTLP/HTTP exporter.
type Config struct {
confighttp.HTTPClientSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
QueueConfig exporterhelper.QueueSettings `mapstructure:"sending_queue"`
RetryConfig configretry.BackOffConfig `mapstructure:"retry_on_failure"`
confighttp.HTTPClientSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
QueueConfig exporterhelper.QueueConfig `mapstructure:"sending_queue"`
RetryConfig configretry.BackOffConfig `mapstructure:"retry_on_failure"`

// The URL to send traces to. If omitted the Endpoint + "/v1/traces" will be used.
TracesEndpoint string `mapstructure:"traces_endpoint"`
Expand Down
2 changes: 1 addition & 1 deletion exporter/otlphttpexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestUnmarshalConfig(t *testing.T) {
MaxInterval: 1 * time.Minute,
MaxElapsedTime: 10 * time.Minute,
},
QueueConfig: exporterhelper.QueueSettings{
QueueConfig: exporterhelper.QueueConfig{
Enabled: true,
NumConsumers: 2,
QueueSize: 10,
Expand Down
Loading

0 comments on commit 0830409

Please sign in to comment.