Skip to content

Commit

Permalink
[exporterhelper] Add WithRequestQueue option to the exporter
Browse files Browse the repository at this point in the history
The new configuration interface for the end users provides a new `queue_size_items` option to limit the queue by a number of spans, log records, or metric data points. The previous way to limit the queue by number of requests is preserved under the same field, `queue_size,` which will later be deprecated through a longer transition process.
  • Loading branch information
dmitryax committed Feb 2, 2024
1 parent 7abb962 commit a6279d9
Show file tree
Hide file tree
Showing 27 changed files with 434 additions and 137 deletions.
32 changes: 32 additions & 0 deletions .chloggen/exporter-helper-v2.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporter/exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add API for enabling queue in the new exporter helpers.

# One or more tracking issues or pull requests related to the change
issues: [7874]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
The following experimental API is introduced in exporter package:
- `exporterhelper.WithRequestQueue`: a new exporter helper option for using a queue.
- `exporterqueue.Queue`: an interface for queue implementations.
- `exporterqueue.Factory`: a queue factory interface, implementations of this interface are intended to be used with WithRequestQueue option.
- `exporterqueue.Settings`: queue factory settings.
- `exporterqueue.Config`: common configuration for queue implementations.
- `exporterqueue.NewDefaultConfig`: a function for creating a default queue configuration.
- `exporterqueue.NewMemoryQueueFactory`: a new factory for creating a memory queue.
- `exporterqueue.NewPersistentQueueFactory: a factory for creating a persistent queue.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
45 changes: 36 additions & 9 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterqueue"
)

// requestSender is an abstraction of a sender for a request independent of the type of the data (traces, metrics, logs).
Expand Down Expand Up @@ -85,17 +86,43 @@ func WithRetry(config configretry.BackOffConfig) Option {
func WithQueue(config QueueSettings) Option {
return func(o *baseExporter) {
if o.requestExporter {
panic("queueing is not available for the new request exporters yet")
panic("WithQueue option is not available for the new request exporters, use WithRequestQueue instead")
}
if !config.Enabled {
o.exportFailureMessage += " Try enabling sending_queue to survive temporary failures."
return
}
consumeErrHandler := func(err error, req Request) {
o.set.Logger.Error("Exporting failed. Dropping data."+o.exportFailureMessage,
zap.Error(err), zap.Int("dropped_items", req.ItemsCount()))
qf := exporterqueue.NewPersistentQueueFactory[Request](config.StorageID, exporterqueue.PersistentQueueSettings[Request]{
Marshaler: o.marshaler,
Unmarshaler: o.unmarshaler,
})
q := qf(context.Background(), exporterqueue.Settings{
DataType: o.signal,
ExporterSettings: o.set,
}, exporterqueue.Config{
Enabled: config.Enabled,
NumConsumers: config.NumConsumers,
QueueSize: config.QueueSize,
})
o.queueSender = newQueueSender(q, o.set, config.NumConsumers, o.exportFailureMessage)
}
}

// WithRequestQueue enables queueing for an exporter.
// This option should be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
// This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
func WithRequestQueue(cfg exporterqueue.Config, queueFactory exporterqueue.Factory[Request]) Option {
return func(o *baseExporter) {
if !cfg.Enabled {
o.exportFailureMessage += " Try enabling sending_queue to survive temporary failures."
return
}
set := exporterqueue.Settings{
DataType: o.signal,
ExporterSettings: o.set,
}
o.queueSender = newQueueSender(config, o.set, o.signal, o.marshaler, o.unmarshaler, consumeErrHandler)
o.queueSender = newQueueSender(queueFactory(context.Background(), set, cfg), o.set, cfg.NumConsumers, o.exportFailureMessage)
}
}

Expand All @@ -114,8 +141,8 @@ type baseExporter struct {
component.ShutdownFunc

requestExporter bool
marshaler RequestMarshaler
unmarshaler RequestUnmarshaler
marshaler exporterqueue.Marshaler[Request]
unmarshaler exporterqueue.Unmarshaler[Request]
signal component.DataType

set exporter.CreateSettings
Expand All @@ -136,8 +163,8 @@ type baseExporter struct {
}

// TODO: requestExporter, marshaler, and unmarshaler arguments can be removed when the old exporter helpers will be updated to call the new ones.
func newBaseExporter(set exporter.CreateSettings, signal component.DataType, requestExporter bool, marshaler RequestMarshaler,
unmarshaler RequestUnmarshaler, osf obsrepSenderFactory, options ...Option) (*baseExporter, error) {
func newBaseExporter(set exporter.CreateSettings, signal component.DataType, requestExporter bool,
marshaler exporterqueue.Marshaler[Request], unmarshaler exporterqueue.Unmarshaler[Request], osf obsrepSenderFactory, options ...Option) (*baseExporter, error) {

obsReport, err := NewObsReport(ObsReportSettings{ExporterID: set.ID, ExporterCreateSettings: set})
if err != nil {
Expand Down
9 changes: 5 additions & 4 deletions exporter/exporterhelper/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import (
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
"go.opentelemetry.io/collector/exporter/exporterqueue"
"go.opentelemetry.io/collector/exporter/internal/queue"
"go.opentelemetry.io/collector/pdata/plog"
)

Expand All @@ -32,7 +33,7 @@ func newLogsRequest(ld plog.Logs, pusher consumer.ConsumeLogsFunc) Request {
}
}

func newLogsRequestUnmarshalerFunc(pusher consumer.ConsumeLogsFunc) RequestUnmarshaler {
func newLogsRequestUnmarshalerFunc(pusher consumer.ConsumeLogsFunc) exporterqueue.Unmarshaler[Request] {
return func(bytes []byte) (Request, error) {
logs, err := logsUnmarshaler.UnmarshalLogs(bytes)
if err != nil {
Expand Down Expand Up @@ -96,7 +97,7 @@ func NewLogsExporter(
lc, err := consumer.NewLogs(func(ctx context.Context, ld plog.Logs) error {
req := newLogsRequest(ld, pusher)
serr := be.send(ctx, req)
if errors.Is(serr, internal.ErrQueueIsFull) {
if errors.Is(serr, queue.ErrQueueIsFull) {
be.obsrep.recordEnqueueFailure(ctx, component.DataTypeLogs, int64(req.ItemsCount()))
}
return serr
Expand Down Expand Up @@ -144,7 +145,7 @@ func NewLogsRequestExporter(
return consumererror.NewPermanent(cErr)
}
sErr := be.send(ctx, req)
if errors.Is(sErr, internal.ErrQueueIsFull) {
if errors.Is(sErr, queue.ErrQueueIsFull) {
be.obsrep.recordEnqueueFailure(ctx, component.DataTypeLogs, int64(req.ItemsCount()))
}
return sErr
Expand Down
4 changes: 2 additions & 2 deletions exporter/exporterhelper/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import (
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/exporter/internal/queue"
"go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics"
"go.opentelemetry.io/collector/internal/testdata"
"go.opentelemetry.io/collector/pdata/plog"
Expand Down Expand Up @@ -167,7 +167,7 @@ func TestLogsExporter_WithPersistentQueue(t *testing.T) {
require.NoError(t, err)

host := &mockHost{ext: map[component.ID]component.Component{
storageID: internal.NewMockStorageExtension(nil),
storageID: queue.NewMockStorageExtension(nil),
}}
require.NoError(t, te.Start(context.Background(), host))
t.Cleanup(func() { require.NoError(t, te.Shutdown(context.Background())) })
Expand Down
9 changes: 5 additions & 4 deletions exporter/exporterhelper/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import (
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
"go.opentelemetry.io/collector/exporter/exporterqueue"
"go.opentelemetry.io/collector/exporter/internal/queue"
"go.opentelemetry.io/collector/pdata/pmetric"
)

Expand All @@ -32,7 +33,7 @@ func newMetricsRequest(md pmetric.Metrics, pusher consumer.ConsumeMetricsFunc) R
}
}

func newMetricsRequestUnmarshalerFunc(pusher consumer.ConsumeMetricsFunc) RequestUnmarshaler {
func newMetricsRequestUnmarshalerFunc(pusher consumer.ConsumeMetricsFunc) exporterqueue.Unmarshaler[Request] {
return func(bytes []byte) (Request, error) {
metrics, err := metricsUnmarshaler.UnmarshalMetrics(bytes)
if err != nil {
Expand Down Expand Up @@ -96,7 +97,7 @@ func NewMetricsExporter(
mc, err := consumer.NewMetrics(func(ctx context.Context, md pmetric.Metrics) error {
req := newMetricsRequest(md, pusher)
serr := be.send(ctx, req)
if errors.Is(serr, internal.ErrQueueIsFull) {
if errors.Is(serr, queue.ErrQueueIsFull) {
be.obsrep.recordEnqueueFailure(ctx, component.DataTypeMetrics, int64(req.ItemsCount()))
}
return serr
Expand Down Expand Up @@ -144,7 +145,7 @@ func NewMetricsRequestExporter(
return consumererror.NewPermanent(cErr)
}
sErr := be.send(ctx, req)
if errors.Is(sErr, internal.ErrQueueIsFull) {
if errors.Is(sErr, queue.ErrQueueIsFull) {
be.obsrep.recordEnqueueFailure(ctx, component.DataTypeMetrics, int64(req.ItemsCount()))
}
return sErr
Expand Down
4 changes: 2 additions & 2 deletions exporter/exporterhelper/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import (
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/exporter/internal/queue"
"go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics"
"go.opentelemetry.io/collector/internal/testdata"
"go.opentelemetry.io/collector/pdata/pmetric"
Expand Down Expand Up @@ -168,7 +168,7 @@ func TestMetricsExporter_WithPersistentQueue(t *testing.T) {
require.NoError(t, err)

host := &mockHost{ext: map[component.ID]component.Component{
storageID: internal.NewMockStorageExtension(nil),
storageID: queue.NewMockStorageExtension(nil),
}}
require.NoError(t, te.Start(context.Background(), host))
t.Cleanup(func() { require.NoError(t, te.Shutdown(context.Background())) })
Expand Down
38 changes: 10 additions & 28 deletions exporter/exporterhelper/queue_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
"go.opentelemetry.io/collector/exporter/exporterqueue"
"go.opentelemetry.io/collector/exporter/internal/queue"
"go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics"
)

Expand Down Expand Up @@ -71,53 +72,34 @@ func (qCfg *QueueSettings) Validate() error {
type queueSender struct {
baseRequestSender
fullName string
queue internal.Queue[Request]
queue exporterqueue.Queue[Request]
traceAttribute attribute.KeyValue
logger *zap.Logger
meter otelmetric.Meter
consumers *internal.QueueConsumers[Request]
consumers *queue.Consumers[Request]

metricCapacity otelmetric.Int64ObservableGauge
metricSize otelmetric.Int64ObservableGauge
}

func newQueueSender(config QueueSettings, set exporter.CreateSettings, signal component.DataType,
marshaler RequestMarshaler, unmarshaler RequestUnmarshaler, consumeErrHandler func(error, Request)) *queueSender {

isPersistent := config.StorageID != nil
var queue internal.Queue[Request]
queueSizer := &internal.RequestSizer[Request]{}
if isPersistent {
queue = internal.NewPersistentQueue[Request](internal.PersistentQueueSettings[Request]{
Sizer: queueSizer,
Capacity: config.QueueSize,
DataType: signal,
StorageID: *config.StorageID,
Marshaler: marshaler,
Unmarshaler: unmarshaler,
ExporterSettings: set,
})
} else {
queue = internal.NewBoundedMemoryQueue[Request](internal.MemoryQueueSettings[Request]{
Sizer: queueSizer,
Capacity: config.QueueSize,
})
}
func newQueueSender(q exporterqueue.Queue[Request], set exporter.CreateSettings, numConsumers int,
exportFailureMessage string) *queueSender {
qs := &queueSender{
fullName: set.ID.String(),
queue: queue,
queue: q,
traceAttribute: attribute.String(obsmetrics.ExporterKey, set.ID.String()),
logger: set.TelemetrySettings.Logger,
meter: set.TelemetrySettings.MeterProvider.Meter(scopeName),
}
consumeFunc := func(ctx context.Context, req Request) error {
err := qs.nextSender.send(ctx, req)
if err != nil {
consumeErrHandler(err, req)
set.Logger.Error("Exporting failed. Dropping data."+exportFailureMessage,
zap.Error(err), zap.Int("dropped_items", req.ItemsCount()))
}
return err
}
qs.consumers = internal.NewQueueConsumers(queue, config.NumConsumers, consumeFunc)
qs.consumers = queue.NewQueueConsumers[Request](q, numConsumers, consumeFunc)
return qs
}

Expand Down
Loading

0 comments on commit a6279d9

Please sign in to comment.