diff --git a/.chloggen/elasticsearch-sync-bulkindexer.yaml b/.chloggen/elasticsearch-sync-bulkindexer.yaml new file mode 100644 index 000000000000..acbe1d796247 --- /dev/null +++ b/.chloggen/elasticsearch-sync-bulkindexer.yaml @@ -0,0 +1,29 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: elasticsearchexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add opt-in support for the experimental `batcher` config + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [32377] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + By enabling (or explicitly disabling) the batcher, the Elasticsearch exporter's + existing batching/buffering logic will be disabled, and the batch sender will be used. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/elasticsearchexporter/README.md b/exporter/elasticsearchexporter/README.md index eaff8a122b62..ae2efcb0fb5a 100644 --- a/exporter/elasticsearchexporter/README.md +++ b/exporter/elasticsearchexporter/README.md @@ -81,6 +81,33 @@ All other defaults are as defined by [confighttp]. The Elasticsearch exporter supports the common [`sending_queue` settings][exporterhelper]. However, the sending queue is currently disabled by default. +### Batching + +> [!WARNING] +> The `batcher` config is experimental and may change without notice. + +The Elasticsearch exporter supports the [common `batcher` settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterbatcher/config.go). + +- `batcher`: + - `enabled` (default=unset): Enable batching of requests into a single bulk request. + - `min_size_items` (default=5000): Minimum number of log records / spans in the buffer to trigger a flush immediately. + - `max_size_items` (default=10000): Maximum number of log records / spans in a request. + - `flush_timeout` (default=30s): Maximum time of the oldest item spent inside the buffer, aka "max age of buffer". A flush will happen regardless of the size of content in buffer. + +By default, the exporter will perform its own buffering and batching, as configured through the +`flush` config, and `batcher` will be unused. By setting `batcher::enabled` to either `true` or +`false`, the exporter will not perform any of its own buffering or batching, and the `flush` config +will be ignored. In a future release when the `batcher` config is stable, and has feature parity +with the exporter's existing `flush` config, it will be enabled by default. + +Using the common `batcher` functionality provides several benefits over the default behavior: + - Combined with a persistent queue, or no queue at all, `batcher` enables at least once delivery. + With the default behavior, the exporter will accept data and process it asynchronously, + which interacts poorly with queuing. + - By ensuring the exporter makes requests to Elasticsearch synchronously, + client metadata can be passed through to Elasticsearch requests, + e.g. by using the [`headers_setter` extension](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/extension/headerssetterextension/README.md). + ### Elasticsearch document routing Telemetry data will be written to signal specific data streams by default: @@ -169,6 +196,9 @@ The behaviour of this bulk indexing can be configured with the following setting - `max_interval` (default=1m): Max waiting time if a HTTP request failed. - `retry_on_status` (default=[429, 500, 502, 503, 504]): Status codes that trigger request or document level retries. Request level retry and document level retry status codes are shared and cannot be configured separately. To avoid duplicates, it is recommended to set it to `[429]`. WARNING: The default will be changed to `[429]` in the future. +> [!NOTE] +> The `flush` config will be ignored when `batcher::enabled` config is explicitly set to `true` or `false`. + ### Elasticsearch node discovery The Elasticsearch Exporter will regularly check Elasticsearch for available nodes. diff --git a/exporter/elasticsearchexporter/bulkindexer.go b/exporter/elasticsearchexporter/bulkindexer.go index 5276d234acd1..4d3ab6068ebf 100644 --- a/exporter/elasticsearchexporter/bulkindexer.go +++ b/exporter/elasticsearchexporter/bulkindexer.go @@ -5,7 +5,7 @@ package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry import ( "context" - "fmt" + "errors" "io" "runtime" "sync" @@ -51,10 +51,103 @@ type bulkIndexerSession interface { } func newBulkIndexer(logger *zap.Logger, client *elasticsearch.Client, config *Config) (bulkIndexer, error) { - // TODO: add support for synchronous bulk indexing, to integrate with the exporterhelper batch sender. + if config.Batcher.Enabled != nil { + return newSyncBulkIndexer(logger, client, config), nil + } return newAsyncBulkIndexer(logger, client, config) } +func newSyncBulkIndexer(logger *zap.Logger, client *elasticsearch.Client, config *Config) *syncBulkIndexer { + var maxDocRetry int + if config.Retry.Enabled { + // max_requests includes initial attempt + // See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/32344 + maxDocRetry = config.Retry.MaxRequests - 1 + } + return &syncBulkIndexer{ + config: docappender.BulkIndexerConfig{ + Client: client, + MaxDocumentRetries: maxDocRetry, + Pipeline: config.Pipeline, + RetryOnDocumentStatus: config.Retry.RetryOnStatus, + }, + flushTimeout: config.Timeout, + retryConfig: config.Retry, + logger: logger, + } +} + +type syncBulkIndexer struct { + config docappender.BulkIndexerConfig + flushTimeout time.Duration + retryConfig RetrySettings + logger *zap.Logger +} + +// StartSession creates a new docappender.BulkIndexer, and wraps +// it with a syncBulkIndexerSession. +func (s *syncBulkIndexer) StartSession(context.Context) (bulkIndexerSession, error) { + bi, err := docappender.NewBulkIndexer(s.config) + if err != nil { + return nil, err + } + return &syncBulkIndexerSession{ + s: s, + bi: bi, + }, nil +} + +// Close is a no-op. +func (s *syncBulkIndexer) Close(context.Context) error { + return nil +} + +type syncBulkIndexerSession struct { + s *syncBulkIndexer + bi *docappender.BulkIndexer +} + +// Add adds an item to the sync bulk indexer session. +func (s *syncBulkIndexerSession) Add(_ context.Context, index string, document io.WriterTo) error { + return s.bi.Add(docappender.BulkIndexerItem{Index: index, Body: document}) +} + +// End is a no-op. +func (s *syncBulkIndexerSession) End() { + // TODO acquire docappender.BulkIndexer from pool in StartSession, release here +} + +// Flush flushes documents added to the bulk indexer session. +func (s *syncBulkIndexerSession) Flush(ctx context.Context) error { + var retryBackoff func(int) time.Duration + for attempts := 0; ; attempts++ { + if _, err := flushBulkIndexer(ctx, s.bi, s.s.flushTimeout, s.s.logger); err != nil { + return err + } + if s.bi.Items() == 0 { + // No documents in buffer waiting for per-document retry, exit retry loop. + return nil + } + if retryBackoff == nil { + retryBackoff = createElasticsearchBackoffFunc(&s.s.retryConfig) + if retryBackoff == nil { + // BUG: This should never happen in practice. + // When retry is disabled / document level retry limit is reached, + // documents should go into FailedDocs instead of indexer buffer. + return errors.New("bulk indexer contains documents pending retry but retry is disabled") + } + } + backoff := retryBackoff(attempts + 1) // TODO: use exporterhelper retry_sender + timer := time.NewTimer(backoff) + select { + case <-ctx.Done(): + timer.Stop() + return ctx.Err() + case <-timer.C: + } + } +} + func newAsyncBulkIndexer(logger *zap.Logger, client *elasticsearch.Client, config *Config) (*asyncBulkIndexer, error) { numWorkers := config.NumWorkers if numWorkers == 0 { @@ -215,18 +308,32 @@ func (w *asyncBulkIndexerWorker) run() { func (w *asyncBulkIndexerWorker) flush() { ctx := context.Background() - if w.flushTimeout > 0 { + stat, _ := flushBulkIndexer(ctx, w.indexer, w.flushTimeout, w.logger) + w.stats.docsIndexed.Add(stat.Indexed) +} + +func flushBulkIndexer( + ctx context.Context, + bi *docappender.BulkIndexer, + timeout time.Duration, + logger *zap.Logger, +) (docappender.BulkIndexerResponseStat, error) { + if timeout > 0 { var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(context.Background(), w.flushTimeout) + ctx, cancel = context.WithTimeout(ctx, timeout) defer cancel() } - stat, err := w.indexer.Flush(ctx) - w.stats.docsIndexed.Add(stat.Indexed) + stat, err := bi.Flush(ctx) if err != nil { - w.logger.Error("bulk indexer flush error", zap.Error(err)) + logger.Error("bulk indexer flush error", zap.Error(err)) } for _, resp := range stat.FailedDocs { - w.logger.Error(fmt.Sprintf("Drop docs: failed to index: %#v", resp.Error), - zap.Int("status", resp.Status)) + logger.Error( + "failed to index document", + zap.String("index", resp.Index), + zap.String("error.type", resp.Error.Type), + zap.String("error.reason", resp.Error.Reason), + ) } + return stat, err } diff --git a/exporter/elasticsearchexporter/config.go b/exporter/elasticsearchexporter/config.go index e5a7b28fdbda..35a9af43e112 100644 --- a/exporter/elasticsearchexporter/config.go +++ b/exporter/elasticsearchexporter/config.go @@ -14,6 +14,7 @@ import ( "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/config/configopaque" + "go.opentelemetry.io/collector/exporter/exporterbatcher" "go.opentelemetry.io/collector/exporter/exporterhelper" "go.uber.org/zap" ) @@ -76,6 +77,31 @@ type Config struct { // TelemetrySettings contains settings useful for testing/debugging purposes // This is experimental and may change at any time. TelemetrySettings `mapstructure:"telemetry"` + + // Batcher holds configuration for batching requests based on timeout + // and size-based thresholds. + // + // Batcher is unused by default, in which case Flush will be used. + // If Batcher.Enabled is non-nil (i.e. batcher::enabled is specified), + // then the Flush will be ignored even if Batcher.Enabled is false. + Batcher BatcherConfig `mapstructure:"batcher"` +} + +// BatcherConfig holds configuration for exporterbatcher. +// +// This is a slightly modified version of exporterbatcher.Config, +// to enable tri-state Enabled: unset, false, true. +type BatcherConfig struct { + // Enabled indicates whether to enqueue batches before sending + // to the exporter. If Enabled is specified (non-nil), + // then the exporter will not perform any buffering itself. + Enabled *bool `mapstructure:"enabled"` + + // FlushTimeout sets the time after which a batch will be sent regardless of its size. + FlushTimeout time.Duration `mapstructure:"flush_timeout"` + + exporterbatcher.MinSizeConfig `mapstructure:",squash"` + exporterbatcher.MaxSizeConfig `mapstructure:",squash"` } type TelemetrySettings struct { diff --git a/exporter/elasticsearchexporter/config_test.go b/exporter/elasticsearchexporter/config_test.go index b6268b154545..d831801721dd 100644 --- a/exporter/elasticsearchexporter/config_test.go +++ b/exporter/elasticsearchexporter/config_test.go @@ -16,6 +16,7 @@ import ( "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/config/configopaque" "go.opentelemetry.io/collector/confmap/confmaptest" + "go.opentelemetry.io/collector/exporter/exporterbatcher" "go.opentelemetry.io/collector/exporter/exporterhelper" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/metadata" @@ -107,6 +108,15 @@ func TestConfig(t *testing.T) { PrefixSeparator: "-", DateFormat: "%Y.%m.%d", }, + Batcher: BatcherConfig{ + FlushTimeout: 30 * time.Second, + MinSizeConfig: exporterbatcher.MinSizeConfig{ + MinSizeItems: 5000, + }, + MaxSizeConfig: exporterbatcher.MaxSizeConfig{ + MaxSizeItems: 10000, + }, + }, }, }, { @@ -168,6 +178,15 @@ func TestConfig(t *testing.T) { PrefixSeparator: "-", DateFormat: "%Y.%m.%d", }, + Batcher: BatcherConfig{ + FlushTimeout: 30 * time.Second, + MinSizeConfig: exporterbatcher.MinSizeConfig{ + MinSizeItems: 5000, + }, + MaxSizeConfig: exporterbatcher.MaxSizeConfig{ + MaxSizeItems: 10000, + }, + }, }, }, { @@ -229,6 +248,15 @@ func TestConfig(t *testing.T) { PrefixSeparator: "-", DateFormat: "%Y.%m.%d", }, + Batcher: BatcherConfig{ + FlushTimeout: 30 * time.Second, + MinSizeConfig: exporterbatcher.MinSizeConfig{ + MinSizeItems: 5000, + }, + MaxSizeConfig: exporterbatcher.MaxSizeConfig{ + MaxSizeItems: 10000, + }, + }, }, }, { @@ -263,6 +291,16 @@ func TestConfig(t *testing.T) { cfg.Endpoint = "https://elastic.example.com:9200" }), }, + { + id: component.NewIDWithName(metadata.Type, "batcher_disabled"), + configFile: "config.yaml", + expected: withDefaultConfig(func(cfg *Config) { + cfg.Endpoint = "https://elastic.example.com:9200" + + enabled := false + cfg.Batcher.Enabled = &enabled + }), + }, } for _, tt := range tests { diff --git a/exporter/elasticsearchexporter/exporter.go b/exporter/elasticsearchexporter/exporter.go index ade3855c2b2a..b16f44388d1f 100644 --- a/exporter/elasticsearchexporter/exporter.go +++ b/exporter/elasticsearchexporter/exporter.go @@ -9,6 +9,7 @@ import ( "errors" "fmt" "runtime" + "sync" "time" "go.opentelemetry.io/collector/component" @@ -31,6 +32,7 @@ type elasticsearchExporter struct { dynamicIndex bool model mappingModel + wg sync.WaitGroup // active sessions bulkIndexer bulkIndexer } @@ -84,12 +86,28 @@ func (e *elasticsearchExporter) Start(ctx context.Context, host component.Host) func (e *elasticsearchExporter) Shutdown(ctx context.Context) error { if e.bulkIndexer != nil { - return e.bulkIndexer.Close(ctx) + if err := e.bulkIndexer.Close(ctx); err != nil { + return err + } + } + + doneCh := make(chan struct{}) + go func() { + e.wg.Wait() + close(doneCh) + }() + select { + case <-ctx.Done(): + return ctx.Err() + case <-doneCh: + return nil } - return nil } func (e *elasticsearchExporter) pushLogsData(ctx context.Context, ld plog.Logs) error { + e.wg.Add(1) + defer e.wg.Done() + session, err := e.bulkIndexer.StartSession(ctx) if err != nil { return err @@ -158,6 +176,9 @@ func (e *elasticsearchExporter) pushMetricsData( ctx context.Context, metrics pmetric.Metrics, ) error { + e.wg.Add(1) + defer e.wg.Done() + session, err := e.bulkIndexer.StartSession(ctx) if err != nil { return err @@ -296,6 +317,9 @@ func (e *elasticsearchExporter) pushTraceData( ctx context.Context, td ptrace.Traces, ) error { + e.wg.Add(1) + defer e.wg.Done() + session, err := e.bulkIndexer.StartSession(ctx) if err != nil { return err diff --git a/exporter/elasticsearchexporter/exporter_test.go b/exporter/elasticsearchexporter/exporter_test.go index eae62957f680..72cb2a3741e6 100644 --- a/exporter/elasticsearchexporter/exporter_test.go +++ b/exporter/elasticsearchexporter/exporter_test.go @@ -912,6 +912,43 @@ func TestExporterAuth(t *testing.T) { <-done } +func TestExporterBatcher(t *testing.T) { + var requests []*http.Request + testauthID := component.NewID(component.MustNewType("authtest")) + batcherEnabled := false // sync bulk indexer is used without batching + exporter := newUnstartedTestLogsExporter(t, "http://testing.invalid", func(cfg *Config) { + cfg.Batcher = BatcherConfig{Enabled: &batcherEnabled} + cfg.Auth = &configauth.Authentication{AuthenticatorID: testauthID} + }) + err := exporter.Start(context.Background(), &mockHost{ + extensions: map[component.ID]component.Component{ + testauthID: &authtest.MockClient{ + ResultRoundTripper: roundTripperFunc(func(req *http.Request) (*http.Response, error) { + requests = append(requests, req) + return nil, errors.New("nope") + }), + }, + }, + }) + require.NoError(t, err) + defer func() { + require.NoError(t, exporter.Shutdown(context.Background())) + }() + + logs := plog.NewLogs() + resourceLogs := logs.ResourceLogs().AppendEmpty() + scopeLogs := resourceLogs.ScopeLogs().AppendEmpty() + scopeLogs.LogRecords().AppendEmpty().Body().SetStr("log record body") + + type key struct{} + _ = exporter.ConsumeLogs(context.WithValue(context.Background(), key{}, "value1"), logs) + _ = exporter.ConsumeLogs(context.WithValue(context.Background(), key{}, "value2"), logs) + require.Len(t, requests, 2) // flushed immediately by Consume + + assert.Equal(t, "value1", requests[0].Context().Value(key{})) + assert.Equal(t, "value2", requests[1].Context().Value(key{})) +} + func newTestTracesExporter(t *testing.T, url string, fns ...func(*Config)) exporter.Traces { f := NewFactory() cfg := withDefaultConfig(append([]func(*Config){func(cfg *Config) { diff --git a/exporter/elasticsearchexporter/factory.go b/exporter/elasticsearchexporter/factory.go index 289b8ed62bda..568889281892 100644 --- a/exporter/elasticsearchexporter/factory.go +++ b/exporter/elasticsearchexporter/factory.go @@ -15,6 +15,7 @@ import ( "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/exporterbatcher" "go.opentelemetry.io/collector/exporter/exporterhelper" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/metadata" @@ -87,6 +88,15 @@ func createDefaultConfig() component.Config { LogRequestBody: false, LogResponseBody: false, }, + Batcher: BatcherConfig{ + FlushTimeout: 30 * time.Second, + MinSizeConfig: exporterbatcher.MinSizeConfig{ + MinSizeItems: 5000, + }, + MaxSizeConfig: exporterbatcher.MaxSizeConfig{ + MaxSizeItems: 10000, + }, + }, } } @@ -117,10 +127,7 @@ func createLogsExporter( set, cfg, exporter.pushLogsData, - exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: true}), - exporterhelper.WithStart(exporter.Start), - exporterhelper.WithShutdown(exporter.Shutdown), - exporterhelper.WithQueue(cf.QueueSettings), + exporterhelperOptions(cf, exporter.Start, exporter.Shutdown)..., ) } @@ -141,10 +148,7 @@ func createMetricsExporter( set, cfg, exporter.pushMetricsData, - exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: true}), - exporterhelper.WithStart(exporter.Start), - exporterhelper.WithShutdown(exporter.Shutdown), - exporterhelper.WithQueue(cf.QueueSettings), + exporterhelperOptions(cf, exporter.Start, exporter.Shutdown)..., ) } @@ -164,9 +168,35 @@ func createTracesExporter(ctx context.Context, set, cfg, exporter.pushTraceData, - exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: true}), - exporterhelper.WithStart(exporter.Start), - exporterhelper.WithShutdown(exporter.Shutdown), - exporterhelper.WithQueue(cf.QueueSettings), + exporterhelperOptions(cf, exporter.Start, exporter.Shutdown)..., ) } + +func exporterhelperOptions( + cfg *Config, + start component.StartFunc, + shutdown component.ShutdownFunc, +) []exporterhelper.Option { + opts := []exporterhelper.Option{ + exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: true}), + exporterhelper.WithStart(start), + exporterhelper.WithShutdown(shutdown), + exporterhelper.WithQueue(cfg.QueueSettings), + } + if cfg.Batcher.Enabled != nil { + batcherConfig := exporterbatcher.Config{ + Enabled: *cfg.Batcher.Enabled, + FlushTimeout: cfg.Batcher.FlushTimeout, + MinSizeConfig: cfg.Batcher.MinSizeConfig, + MaxSizeConfig: cfg.Batcher.MaxSizeConfig, + } + opts = append(opts, exporterhelper.WithBatcher(batcherConfig)) + + // Effectively disable timeout_sender because timeout is enforced in bulk indexer. + // + // We keep timeout_sender enabled in the async mode (Batcher.Enabled == nil), + // to ensure sending data to the background workers will not block indefinitely. + opts = append(opts, exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0})) + } + return opts +} diff --git a/exporter/elasticsearchexporter/integrationtest/datareceiver.go b/exporter/elasticsearchexporter/integrationtest/datareceiver.go index e530d864fb35..14158cbef9e9 100644 --- a/exporter/elasticsearchexporter/integrationtest/datareceiver.go +++ b/exporter/elasticsearchexporter/integrationtest/datareceiver.go @@ -47,16 +47,35 @@ type esDataReceiver struct { receiver receiver.Logs endpoint string decodeBulkRequest bool + batcherEnabled *bool t testing.TB } -func newElasticsearchDataReceiver(t testing.TB, decodeBulkRequest bool) *esDataReceiver { - return &esDataReceiver{ +type dataReceiverOption func(*esDataReceiver) + +func newElasticsearchDataReceiver(t testing.TB, opts ...dataReceiverOption) *esDataReceiver { + r := &esDataReceiver{ DataReceiverBase: testbed.DataReceiverBase{}, endpoint: fmt.Sprintf("http://%s:%d", testbed.DefaultHost, testutil.GetAvailablePort(t)), - decodeBulkRequest: decodeBulkRequest, + decodeBulkRequest: true, t: t, } + for _, opt := range opts { + opt(r) + } + return r +} + +func withDecodeBulkRequest(decode bool) dataReceiverOption { + return func(r *esDataReceiver) { + r.decodeBulkRequest = decode + } +} + +func withBatcherEnabled(enabled bool) dataReceiverOption { + return func(r *esDataReceiver) { + r.batcherEnabled = &enabled + } } func (es *esDataReceiver) Start(tc consumer.Traces, _ consumer.Metrics, lc consumer.Logs) error { @@ -102,20 +121,34 @@ func (es *esDataReceiver) Stop() error { func (es *esDataReceiver) GenConfigYAMLStr() string { // Note that this generates an exporter config for agent. - cfgFormat := ` + cfgFormat := fmt.Sprintf(` elasticsearch: endpoints: [%s] logs_index: %s traces_index: %s - flush: - interval: 1s sending_queue: enabled: true retry: enabled: true - max_requests: 10000 -` - return fmt.Sprintf(cfgFormat, es.endpoint, TestLogsIndex, TestTracesIndex) + initial_interval: 100ms + max_interval: 1s + max_requests: 10000`, + es.endpoint, TestLogsIndex, TestTracesIndex, + ) + + if es.batcherEnabled == nil { + cfgFormat += ` + flush: + interval: 1s` + } else { + cfgFormat += fmt.Sprintf(` + batcher: + flush_timeout: 1s + enabled: %v`, + *es.batcherEnabled, + ) + } + return cfgFormat + "\n" } func (es *esDataReceiver) ProtocolName() string { diff --git a/exporter/elasticsearchexporter/integrationtest/exporter_bench_test.go b/exporter/elasticsearchexporter/integrationtest/exporter_bench_test.go index 2a1fe98db9b3..18f98c1911f8 100644 --- a/exporter/elasticsearchexporter/integrationtest/exporter_bench_test.go +++ b/exporter/elasticsearchexporter/integrationtest/exporter_bench_test.go @@ -125,7 +125,7 @@ func prepareBenchmark( cfg := &benchRunnerCfg{} // Benchmarks don't decode the bulk requests to avoid allocations to pollute the results. - receiver := newElasticsearchDataReceiver(b, false /* DecodeBulkRequest */) + receiver := newElasticsearchDataReceiver(b, withDecodeBulkRequest(false)) cfg.provider = testbed.NewPerfTestDataProvider(testbed.LoadOptions{ItemsPerBatch: batchSize}) cfg.provider.SetLoadGeneratorCounters(&cfg.generatedCount) diff --git a/exporter/elasticsearchexporter/integrationtest/exporter_test.go b/exporter/elasticsearchexporter/integrationtest/exporter_test.go index c0df3d575308..013994898511 100644 --- a/exporter/elasticsearchexporter/integrationtest/exporter_test.go +++ b/exporter/elasticsearchexporter/integrationtest/exporter_test.go @@ -20,6 +20,13 @@ func TestExporter(t *testing.T) { for _, eventType := range []string{"logs", "traces"} { for _, tc := range []struct { name string + + // batcherEnabled enables/disables the batch sender. If this is + // nil, then the exporter buffers data itself (legacy behavior), + // whereas if it is non-nil then the exporter will not perform + // any buffering itself. + batcherEnabled *bool + // restartCollector restarts the OTEL collector. Restarting // the collector allows durability testing of the ES exporter // based on the OTEL config used for testing. @@ -28,19 +35,29 @@ func TestExporter(t *testing.T) { }{ {name: "basic"}, {name: "es_intermittent_failure", mockESFailure: true}, + + {name: "batcher_enabled", batcherEnabled: ptrTo(true)}, + {name: "batcher_enabled_es_intermittent_failure", batcherEnabled: ptrTo(true), mockESFailure: true}, + {name: "batcher_disabled", batcherEnabled: ptrTo(false)}, + {name: "batcher_disabled_es_intermittent_failure", batcherEnabled: ptrTo(false), mockESFailure: true}, + /* TODO: Below tests should be enabled after https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/30792 is fixed {name: "collector_restarts", restartCollector: true}, {name: "collector_restart_with_es_intermittent_failure", mockESFailure: true, restartCollector: true}, */ } { t.Run(fmt.Sprintf("%s/%s", eventType, tc.name), func(t *testing.T) { - runner(t, eventType, tc.restartCollector, tc.mockESFailure) + var opts []dataReceiverOption + if tc.batcherEnabled != nil { + opts = append(opts, withBatcherEnabled(*tc.batcherEnabled)) + } + runner(t, eventType, tc.restartCollector, tc.mockESFailure, opts...) }) } } } -func runner(t *testing.T, eventType string, restartCollector, mockESFailure bool) { +func runner(t *testing.T, eventType string, restartCollector, mockESFailure bool, opts ...dataReceiverOption) { t.Helper() var ( @@ -57,7 +74,7 @@ func runner(t *testing.T, eventType string, restartCollector, mockESFailure bool t.Fatalf("failed to create data sender for type: %s", eventType) } - receiver := newElasticsearchDataReceiver(t, true) + receiver := newElasticsearchDataReceiver(t, opts...) loadOpts := testbed.LoadOptions{ DataItemsPerSecond: 1_000, ItemsPerBatch: 10, @@ -123,3 +140,7 @@ func runner(t *testing.T, eventType string, restartCollector, mockESFailure bool ) tc.ValidateData() } + +func ptrTo[T any](t T) *T { + return &t +} diff --git a/exporter/elasticsearchexporter/testdata/config.yaml b/exporter/elasticsearchexporter/testdata/config.yaml index acd6e92f9001..6f614399b579 100644 --- a/exporter/elasticsearchexporter/testdata/config.yaml +++ b/exporter/elasticsearchexporter/testdata/config.yaml @@ -82,3 +82,7 @@ elasticsearch/deprecated_index: index: my_log_index elasticsearch/confighttp_endpoint: endpoint: https://elastic.example.com:9200 +elasticsearch/batcher_disabled: + endpoint: https://elastic.example.com:9200 + batcher: + enabled: false