diff --git a/exporter/exporterhelper/batch_sender.go b/exporter/exporterhelper/batch_sender.go new file mode 100644 index 00000000000..360d4f5e7bb --- /dev/null +++ b/exporter/exporterhelper/batch_sender.go @@ -0,0 +1,467 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exporterhelper + +import ( + "context" + "errors" + "runtime" + "sync" + "time" + + "go.uber.org/zap" + + "go.opentelemetry.io/collector/consumer/consumererror" + "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/exporterhelper/internal" +) + +// BaseBatchConfig defines a basic configuration for batching requests based on a timeout and a minimum number of +// items. +// All additional batching configurations should be embedded along with this struct. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type BaseBatchConfig struct { + // Enabled indicates whether to not enqueue batches before sending to the consumerSender. + Enabled bool `mapstructure:"enabled"` + + // Timeout sets the time after which a batch will be sent regardless of its size. + // When this is set to zero, batched data will be sent immediately. + // This is a recommended option, as it will ensure that the data is sent in a timely manner. + Timeout time.Duration `mapstructure:"timeout"` // Is there a better name to avoid confusion with the consumerSender timeout? + + // MinSizeItems is the number of items (spans, data points or log records for OTLP) at which the batch should be + // sent regardless of the timeout. There is no guarantee that the batch size always greater than this value. + // This option requires the Request to implement RequestItemsCounter interface. Otherwise, it will be ignored. + MinSizeItems int `mapstructure:"min_size_items"` +} + +// BatchConfigMaxItems defines batching configuration part for setting a maximum number of items. +// Should not be used directly, use as part of a struct that embeds it. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type BatchConfigMaxItems struct { + // MaxSizeItems is the maximum number of the batch items, i.e. spans, data points or log records for OTLP, + // but can be anything else for other formats. If the batch size exceeds this value, + // it will be broken up into smaller batches if possible. + MaxSizeItems int `mapstructure:"max_size_items"` +} + +// BatchConfigBatchersLimit defines batching configuration part for setting a maximum number of batchers. +type BatchConfigBatchersLimit struct { + // BatchersLimit is the maximum number of batchers that can be used for batching. + // Requests producing batch identifiers that exceed this limit will be dropped. + // If this value is zero, then there is no limit on the number of batchers. + BatchersLimit int `mapstructure:"batchers_limit"` +} + +// BatchMergeFunc is a function that merges two requests into a single request. +// Context will be propagated from the first request. If you want to separate batches based on the context, +// use WithRequestBatchIdentifier option. +type BatchMergeFunc func(context.Context, Request, Request) (Request, error) + +// BatchMergeOrSplitFunc is a function that merges two requests into a single request or splits a request into multiple +// requests if the merged request exceeds the maximum number of items. +// If maxItems is zero, then the function must always merge the requests into a single request. +// All the returned requests must have a number of items that does not exceed the maximum number of items. +// Size of the last returned request must not be more than the size of any other returned request. +type BatchMergeOrSplitFunc func(ctx context.Context, req1 Request, req2 Request, maxItems int) ([]Request, error) + +// IdentifyRequestBatch returns an identifier for a Request This function can be used to separate particular Requests +// into different batches. Batches with different identifiers will not be merged together. +// Provided context can be used to extract information from the context and use it as a part of the identifier as well. +// This function is optional. If not provided, all Requests will be batched together. +type IdentifyRequestBatch func(ctx context.Context, r Request) string + +type batcherConfig struct { + enabled bool + timeout time.Duration + minSizeItems int + maxSizeItems int + batchersLimit int +} + +// Batcher is a helper struct that can be used to batch requests into different batches. +// This function is optional. If not provided, all Requests will be batched together. +type Batcher struct { + cfg batcherConfig + mergeFunc func(context.Context, internal.Request, internal.Request) (internal.Request, error) + mergeOrSplitFunc func(ctx context.Context, req1 internal.Request, req2 internal.Request, maxItems int) ([]internal.Request, error) + batchIdentifier IdentifyRequestBatch +} + +func NewMergeBatcher(cfg BaseBatchConfig, mf BatchMergeFunc, opts ...BatcherOption) *Batcher { + b := &Batcher{ + cfg: batcherConfig{ + enabled: cfg.Enabled, + timeout: cfg.Timeout, + minSizeItems: cfg.MinSizeItems, + }, + mergeFunc: func(ctx context.Context, req1 internal.Request, req2 internal.Request) (internal.Request, error) { + if req1 == nil { + return req2, nil + } + if req2 == nil { + return req1, nil + } + r, err := mf(ctx, req1, req2) + if err != nil { + return nil, err + } + return &request{ + baseRequest: baseRequest{ctx: req1.Context()}, + Request: r, + }, nil + }, + } + for _, op := range opts { + op(b) + } + return b +} + +func NewMergeOrSplitBatcher(cfg BaseBatchConfig, bcmi BatchConfigMaxItems, msf BatchMergeOrSplitFunc, opts ...BatcherOption) *Batcher { + mergeOrSplitFunc := func(ctx context.Context, req1 internal.Request, req2 internal.Request, maxItems int) ([]internal.Request, error) { + if req1 == nil { + return []internal.Request{req2}, nil + } + if req2 == nil { + return []internal.Request{req1}, nil + } + r, err := msf(ctx, req1, req2, maxItems) + if err != nil { + return nil, err + } + reqs := make([]internal.Request, 0, len(r)) + for _, req := range r { + reqs = append(reqs, &request{ + baseRequest: baseRequest{ctx: req1.Context()}, + Request: req, + }) + } + return reqs, nil + } + mergeFunc := func(ctx context.Context, req1 internal.Request, req2 internal.Request) (internal.Request, error) { + if req1 == nil { + return req2, nil + } + if req2 == nil { + return req1, nil + } + r, err := mergeOrSplitFunc(ctx, req1, req2, 0) + if err != nil { + return nil, err + } + if len(r) == 0 { + return nil, nil + } + // mergeOrSplitFunc must return at one request if maxItems is zero. + // TODO: add an error log message here if size of r is more than 1. + return r[0], nil + } + b := &Batcher{ + cfg: batcherConfig{ + enabled: cfg.Enabled, + timeout: cfg.Timeout, + minSizeItems: cfg.MinSizeItems, + maxSizeItems: bcmi.MaxSizeItems, + }, + mergeOrSplitFunc: mergeOrSplitFunc, + mergeFunc: mergeFunc, + } + for _, op := range opts { + op(b) + } + return b +} + +type BatcherOption func(*Batcher) + +func WithRequestBatchIdentifier(f IdentifyRequestBatch, bl BatchConfigBatchersLimit) BatcherOption { + return func(b *Batcher) { + b.batchIdentifier = f + b.cfg.batchersLimit = bl.BatchersLimit + } +} + +// errTooManyBatchers is returned when the MetadataCardinalityLimit has been reached. +var errTooManyBatchers = consumererror.NewPermanent(errors.New("too many batch identifier combinations")) + +// batchSender is a component that accepts places requests into batches before passing them to the downstream senders. +// +// batch_processor implements consumer.Traces and consumer.Metrics +// +// Batches are sent out with any of the following conditions: +// - batch size reaches cfg.SendBatchSize +// - cfg.Timeout is elapsed since the timestamp when the previous batch was sent out. +type batchSender struct { + baseRequestSender + cfg batcherConfig + mergeFunc func(context.Context, internal.Request, internal.Request) (internal.Request, error) + mergeOrSplitFunc func(ctx context.Context, req1 internal.Request, req2 internal.Request, maxItems int) ([]internal.Request, error) + batchIdentifier IdentifyRequestBatch + + logger *zap.Logger + + // identifyBatchFunc is a function that returns a key + // identifying the batch to which the item should be added. + identifyBatchFunc func(internal.Request) string + + shutdownC chan struct{} + goroutines sync.WaitGroup + + // batcher will be either *singletonBatcher or *multiBatcher + batcher batcher +} + +// newBatchSender returns a new batch consumer component. +func newBatchSender(set exporter.CreateSettings, b Batcher) *batchSender { + bs := &batchSender{ + cfg: b.cfg, + mergeFunc: b.mergeFunc, + mergeOrSplitFunc: b.mergeOrSplitFunc, + logger: set.Logger, + shutdownC: make(chan struct{}, 1), + } + if b.batchIdentifier == nil { + bs.batcher = &singleShardBatcher{batcher: bs.newShard()} + } else { + bs.batcher = &multiShardBatcher{batchSender: bs} + } + return bs +} + +// newShard gets or creates a batcher corresponding with attrs. +func (bs *batchSender) newShard() *shard { + b := &shard{ + consumer: bs, + newRequest: make(chan internal.Request, runtime.NumCPU()), + } + go b.start() + return b +} + +// Shutdown is invoked during service shutdown. +func (bs *batchSender) shutdown() { + close(bs.shutdownC) + + // Wait until all goroutines are done. + bs.goroutines.Wait() +} + +func (bs *batchSender) send(req internal.Request) error { + s, err := bs.batcher.shard(bs.identifyBatchFunc(req)) + if err != nil { + return err + } + + // For now the batcher can work asyncronously only. Potentially we can + // add a sync mode later. + s.newRequest <- req + return nil +} + +func (bs *batchSender) identifyBatch(req internal.Request) string { + if bs.identifyBatchFunc != nil { + return bs.identifyBatchFunc(req) + } + return "" +} + +type batcher interface { + shard(id string) (*shard, error) + shardsCount() int +} + +// shard is a single instance of the batch logic. When metadata +// keys are in use, one of these is created per distinct combination +// of values. +type shard struct { + // consumer refers to this consumer, for access to common + // configuration. + consumer *batchSender + + // exportCtx is a context with the metadata key-values + // corresponding with this shard set. + exportCtx context.Context + + // timer informs the shard send a batch. + timer *time.Timer + + // newRequest is used to receive batches from producers. + newRequest chan internal.Request + + // batch is an in-flight data item containing one of the + // underlying data types. + batch *request + + lock sync.Mutex + + minSizeItems int + maxSizeItems int +} + +func (s *shard) start() { + if s.consumer.cfg.timeout == 0 { + return + } + + s.timer = time.NewTimer(s.consumer.cfg.timeout) + + s.consumer.goroutines.Add(1) + defer s.consumer.goroutines.Done() + + for { + select { + case <-s.consumer.shutdownC: + DONE: + for { + select { + case req := <-s.newRequest: + s.processRequest(req) + default: + break DONE + } + } + // This is the close of the channel + if s.batch != nil && s.batch.Count() > 0 { + s.export(s.batch) + } + return + case req := <-s.newRequest: + if req == nil { + continue + } + s.processRequest(req) + case <-s.timer.C: + if s.batch.Count() > 0 { + s.export(s.batch) + } + s.resetTimer() + } + } +} + +func (s *shard) processRequest(req internal.Request) { + var sent bool + if s.consumer.mergeOrSplitFunc != nil { + sent = s.processRequestMergeOrSplit(req) + } else { + s.processRequestMerge(req) + } + + if s.batch.Count() >= s.consumer.cfg.minSizeItems { + s.export(s.batch) + s.batch = nil + } + + if sent { + s.stopTimer() + s.resetTimer() + } +} + +func (s *shard) processRequestMergeOrSplit(req internal.Request) bool { + reqs, err := s.consumer.mergeOrSplitFunc(s.exportCtx, s.batch, req, s.consumer.cfg.maxSizeItems) + if err != nil { + s.consumer.logger.Error("Failed to merge or split the request", zap.Error(err)) + return false + } + sent := false + for _, r := range reqs[:len(reqs)-1] { + s.export(r) + sent = true + } + return sent +} + +func (s *shard) processRequestMerge(req internal.Request) { + req, err := s.consumer.mergeFunc(s.exportCtx, s.batch, req) + if err != nil { + s.consumer.logger.Error("Failed to merge or split the request", zap.Error(err)) + return + } + if req.Count() > s.consumer.cfg.minSizeItems { + s.export(req) + } +} + +func (s *shard) export(req internal.Request) { + if req == nil { + return + } + + err := s.consumer.nextSender.send(req) + // TODO: Replace with metrics and logging. + if err != nil { + s.consumer.logger.Error("Failed to send batch", zap.Error(err)) + } +} + +func (s *shard) hasTimer() bool { + return s.timer != nil +} + +func (s *shard) stopTimer() { + if s.hasTimer() && !s.timer.Stop() { + <-s.timer.C + } +} + +func (s *shard) resetTimer() { + if s.hasTimer() { + s.timer.Reset(s.consumer.cfg.timeout) + } +} + +// singleShardBatcher is used when metadataKeys is empty, to avoid the +// additional lock and map operations used in multiBatcher. +type singleShardBatcher struct { + batcher *shard +} + +func (sb *singleShardBatcher) shard(_ string) (*shard, error) { + return sb.batcher, nil +} + +func (sb *singleShardBatcher) shardsCount() int { + return 1 +} + +// multiBatcher is used when metadataKeys is not empty. +type multiShardBatcher struct { + *batchSender + batchers sync.Map + + // Guards the size and the storing logic to ensure no more than limit items are stored. + // If we are willing to allow "some" extra items than the limit this can be removed and size can be made atomic. + lock sync.Mutex + size int +} + +func (mb *multiShardBatcher) shard(id string) (*shard, error) { + s, ok := mb.batchers.Load(id) + if ok { + return s.(*shard), nil + } + + mb.lock.Lock() + defer mb.lock.Unlock() + + if mb.batchSender.cfg.batchersLimit != 0 && mb.size >= mb.batchSender.cfg.batchersLimit { + return nil, errTooManyBatchers + } + + s, loaded := mb.batchers.LoadOrStore(id, mb.newShard()) + if !loaded { + mb.size++ + } + return s.(*shard), nil +} + +func (mb *multiShardBatcher) shardsCount() int { + mb.lock.Lock() + defer mb.lock.Unlock() + return mb.size +} diff --git a/exporter/exporterhelper/common.go b/exporter/exporterhelper/common.go index f6277404619..13dc211d92e 100644 --- a/exporter/exporterhelper/common.go +++ b/exporter/exporterhelper/common.go @@ -137,6 +137,15 @@ func WithCapabilities(capabilities consumer.Capabilities) Option { } } +func WithBatcher(batcher Batcher) Option { + return func(o *baseExporter) { + if !o.requestExporter { + panic("batching is only available for the new request exporters") + } + o.batchSender = newBatchSender(o.set, batcher) + } +} + // baseExporter contains common fields between different exporter types. type baseExporter struct { component.StartFunc @@ -154,6 +163,7 @@ type baseExporter struct { // Chain of senders that the exporter helper applies before passing the data to the actual exporter. // The data is handled by each sender in the respective order starting from the queueSender. // Most of the senders are optional, and initialized with a no-op path-through sender. + batchSender requestSender queueSender requestSender obsrepSender requestSender retrySender requestSender @@ -180,6 +190,7 @@ func newBaseExporter(set exporter.CreateSettings, signal component.DataType, req unmarshaler: unmarshaler, signal: signal, + batchSender: &baseRequestSender{}, queueSender: &baseRequestSender{}, obsrepSender: osf(obsrep), retrySender: &baseRequestSender{}, @@ -200,11 +211,12 @@ func newBaseExporter(set exporter.CreateSettings, signal component.DataType, req // send sends the request using the first sender in the chain. func (be *baseExporter) send(req internal.Request) error { - return be.queueSender.send(req) + return be.batchSender.send(req) } // connectSenders connects the senders in the predefined order. func (be *baseExporter) connectSenders() { + be.batchSender.setNextSender(be.queueSender) be.queueSender.setNextSender(be.obsrepSender) be.obsrepSender.setNextSender(be.retrySender) be.retrySender.setNextSender(be.timeoutSender) diff --git a/exporter/exporterhelper/logs.go b/exporter/exporterhelper/logs.go index b098e722921..682d42a5072 100644 --- a/exporter/exporterhelper/logs.go +++ b/exporter/exporterhelper/logs.go @@ -98,6 +98,7 @@ func NewLogsExporter( lc, err := consumer.NewLogs(func(ctx context.Context, ld plog.Logs) error { req := newLogsRequest(ctx, ld, pusher) serr := be.send(req) + // TODO: Check for the queue overflow before converting the data. if errors.Is(serr, errSendingQueueIsFull) { be.obsrep.recordLogsEnqueueFailure(req.Context(), int64(req.Count())) } diff --git a/exporter/exporterhelper/metrics.go b/exporter/exporterhelper/metrics.go index e3f09f361c7..fee7326b9e1 100644 --- a/exporter/exporterhelper/metrics.go +++ b/exporter/exporterhelper/metrics.go @@ -150,6 +150,7 @@ func NewMetricsRequestExporter( } r := newRequest(ctx, req) sErr := be.send(r) + // TODO: Check for the queue overflow before converting the data. if errors.Is(sErr, errSendingQueueIsFull) { be.obsrep.recordMetricsEnqueueFailure(r.Context(), int64(r.Count())) } diff --git a/exporter/exporterhelper/traces.go b/exporter/exporterhelper/traces.go index 4b9e397ec43..b6439a2dd9f 100644 --- a/exporter/exporterhelper/traces.go +++ b/exporter/exporterhelper/traces.go @@ -150,6 +150,7 @@ func NewTracesRequestExporter( } r := newRequest(ctx, req) sErr := be.send(r) + // TODO: Check for the queue overflow before converting the data. if errors.Is(sErr, errSendingQueueIsFull) { be.obsrep.recordTracesEnqueueFailure(r.Context(), int64(r.Count())) }