Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pkg/stanza] Make batching behavior configurable #21184

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .chloggen/stanza-make-batch-configurable.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: pkg/stanza

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Make batching behavior configurable

# One or more tracking issues related to the change
issues: [21184]

subtext: |
Expose configuration options to control the batching behavior of the stanza receivers:
- `batch.max_batch_size`: The maximum number of spans to batch together.
- `batch.timeout`: The maximum amount of time to wait before sending a batch.
20 changes: 20 additions & 0 deletions pkg/stanza/adapter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,35 @@
package adapter // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/adapter"

import (
"time"

"go.opentelemetry.io/collector/component"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/consumerretry"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
)

// BatchConfig is the configuration for logs batching. Logs are sent to the next consumer either when the batch size
// reaches MaxBatchSize or when the timeout is reached, whichever happens first.
type BatchConfig struct {
// MaxBatchSize specifies the maximum number of logs to batch.
MaxBatchSize uint `mapstructure:"max_batch_size"`

// Timeout specifies the maximum duration for batching logs.
Timeout time.Duration `mapstructure:"timeout"`
}

func NewDefaultBatchConfig() BatchConfig {
return BatchConfig{
MaxBatchSize: 100,
Timeout: 100 * time.Millisecond,
}
}

// BaseConfig is the common configuration of a stanza-based receiver
type BaseConfig struct {
Operators []operator.Config `mapstructure:"operators"`
StorageID *component.ID `mapstructure:"storage"`
Batch BatchConfig `mapstructure:"batch"`
RetryOnFailure consumerretry.Config `mapstructure:"retry_on_failure"`
}
13 changes: 4 additions & 9 deletions pkg/stanza/adapter/emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,8 @@ type LogEmitter struct {
flushInterval time.Duration
}

var (
defaultFlushInterval = 100 * time.Millisecond
defaultMaxBatchSize uint = 100
)

// NewLogEmitter creates a new receiver output
func NewLogEmitter(logger *zap.SugaredLogger) *LogEmitter {
func NewLogEmitter(logger *zap.SugaredLogger, bc BatchConfig) *LogEmitter {
return &LogEmitter{
OutputOperator: helper.OutputOperator{
BasicOperator: helper.BasicOperator{
Expand All @@ -55,9 +50,9 @@ func NewLogEmitter(logger *zap.SugaredLogger) *LogEmitter {
},
},
logChan: make(chan []*entry.Entry),
maxBatchSize: defaultMaxBatchSize,
batch: make([]*entry.Entry, 0, defaultMaxBatchSize),
flushInterval: defaultFlushInterval,
maxBatchSize: bc.MaxBatchSize,
batch: make([]*entry.Entry, 0, bc.MaxBatchSize),
flushInterval: bc.Timeout,
cancel: func() {},
}
}
Expand Down
31 changes: 13 additions & 18 deletions pkg/stanza/adapter/emitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
)

func TestLogEmitter(t *testing.T) {
emitter := NewLogEmitter(zaptest.NewLogger(t).Sugar())
emitter := NewLogEmitter(zaptest.NewLogger(t).Sugar(), NewDefaultBatchConfig())

require.NoError(t, emitter.Start(nil))

Expand All @@ -49,11 +49,11 @@ func TestLogEmitter(t *testing.T) {
}

func TestLogEmitterEmitsOnMaxBatchSize(t *testing.T) {
const (
maxBatchSize = 100
timeout = time.Second
)
emitter := NewLogEmitter(zaptest.NewLogger(t).Sugar())
const maxBatchSize = 10
emitter := NewLogEmitter(zaptest.NewLogger(t).Sugar(), BatchConfig{
MaxBatchSize: maxBatchSize,
Timeout: time.Second,
})

require.NoError(t, emitter.Start(nil))
defer func() {
Expand All @@ -69,7 +69,7 @@ func TestLogEmitterEmitsOnMaxBatchSize(t *testing.T) {
}
}()

timeoutChan := time.After(timeout)
timeoutChan := time.After(500 * time.Millisecond)

select {
case recv := <-emitter.logChan:
Expand All @@ -80,30 +80,25 @@ func TestLogEmitterEmitsOnMaxBatchSize(t *testing.T) {
}

func TestLogEmitterEmitsOnFlushInterval(t *testing.T) {
const (
flushInterval = 100 * time.Millisecond
timeout = time.Second
)
emitter := NewLogEmitter(zaptest.NewLogger(t).Sugar())
emitter := NewLogEmitter(zaptest.NewLogger(t).Sugar(), BatchConfig{
MaxBatchSize: 100,
Timeout: 10 * time.Millisecond,
})

require.NoError(t, emitter.Start(nil))
defer func() {
require.NoError(t, emitter.Stop())
}()

entry := complexEntry()

go func() {
ctx := context.Background()
require.NoError(t, emitter.Process(ctx, entry))
require.NoError(t, emitter.Process(ctx, complexEntry()))
}()

timeoutChan := time.After(timeout)

select {
case recv := <-emitter.logChan:
require.Equal(t, 1, len(recv), "Should have received one entry, got %d instead", len(recv))
case <-timeoutChan:
case <-time.After(time.Millisecond * 50):
require.FailNow(t, "Failed to receive log entry before timeout")
}
}
2 changes: 1 addition & 1 deletion pkg/stanza/adapter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func createLogsReceiver(logReceiverType LogReceiverType) rcvr.CreateLogsFunc {

operators := append([]operator.Config{inputCfg}, baseCfg.Operators...)

emitter := NewLogEmitter(params.Logger.Sugar())
emitter := NewLogEmitter(params.Logger.Sugar(), baseCfg.Batch)
pipe, err := pipeline.Config{
Operators: operators,
DefaultOutput: emitter,
Expand Down
2 changes: 1 addition & 1 deletion pkg/stanza/adapter/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
)

func createNoopReceiver(nextConsumer consumer.Logs) (*receiver, error) {
emitter := NewLogEmitter(zap.NewNop().Sugar())
emitter := NewLogEmitter(zap.NewNop().Sugar(), NewDefaultBatchConfig())

pipe, err := pipeline.Config{
Operators: []operator.Config{
Expand Down
1 change: 1 addition & 0 deletions pkg/stanza/adapter/mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func (f TestReceiverType) CreateDefaultConfig() component.Config {
return &TestConfig{
BaseConfig: BaseConfig{
Operators: []operator.Config{},
Batch: NewDefaultBatchConfig(),
},
Input: operator.NewConfig(noop.NewConfig()),
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/stanza/adapter/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func BenchmarkReadLine(b *testing.B) {
var operatorCfgs []operator.Config
require.NoError(b, yaml.Unmarshal([]byte(pipelineYaml), &operatorCfgs))

emitter := NewLogEmitter(zap.NewNop().Sugar())
emitter := NewLogEmitter(zap.NewNop().Sugar(), NewDefaultBatchConfig())
defer func() {
require.NoError(b, emitter.Stop())
}()
Expand Down Expand Up @@ -206,7 +206,7 @@ func BenchmarkParseAndMap(b *testing.B) {
var operatorCfgs []operator.Config
require.NoError(b, yaml.Unmarshal([]byte(pipelineYaml), &operatorCfgs))

emitter := NewLogEmitter(zap.NewNop().Sugar())
emitter := NewLogEmitter(zap.NewNop().Sugar(), NewDefaultBatchConfig())
defer func() {
require.NoError(b, emitter.Stop())
}()
Expand Down
1 change: 1 addition & 0 deletions processor/logstransformprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func TestLoadConfig(t *testing.T) {
}(),
},
},
Batch: adapter.NewDefaultBatchConfig(),
},
}, cfg)
}
1 change: 1 addition & 0 deletions processor/logstransformprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func createDefaultConfig() component.Config {
return &Config{
BaseConfig: adapter.BaseConfig{
Operators: []operator.Config{},
Batch: adapter.NewDefaultBatchConfig(),
},
}
}
Expand Down
5 changes: 5 additions & 0 deletions processor/logstransformprocessor/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package logstransformprocessor
import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/component/componenttest"
Expand Down Expand Up @@ -59,6 +60,10 @@ func TestCreateProcessor(t *testing.T) {
}(),
},
},
Batch: adapter.BatchConfig{
MaxBatchSize: 1000,
Timeout: 200 * time.Millisecond,
},
},
}

Expand Down
2 changes: 1 addition & 1 deletion processor/logstransformprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func newProcessor(config *Config, nextConsumer consumer.Logs, logger *zap.Logger

baseCfg := p.config.BaseConfig

p.emitter = adapter.NewLogEmitter(p.logger.Sugar())
p.emitter = adapter.NewLogEmitter(p.logger.Sugar(), adapter.NewDefaultBatchConfig())
pipe, err := pipeline.Config{
Operators: baseCfg.Operators,
DefaultOutput: p.emitter,
Expand Down
2 changes: 2 additions & 0 deletions receiver/filelogreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ Tails and parses logs from files.
| `retry_on_failure.initial_interval` | `1 second` | Time to wait after the first failure before retrying. |
| `retry_on_failure.max_interval` | `30 seconds` | Upper bound on retry backoff interval. Once this value is reached the delay between consecutive retries will remain constant at the specified value. |
| `retry_on_failure.max_elapsed_time` | `5 minutes` | Maximum amount of time (including retries) spent trying to send a logs batch to a downstream consumer. Once this value is reached, the data is discarded. Retrying never stops if set to `0`. |
| `batch.max_batch_size` | `100` | Maximum number of log records per emitted batch. |
| `batch.timeout` | `100 milliseconds` | Maximum duration to wait before sending out the log records batch. |

Note that _by default_, no logs will be read from a file that is not actively being written to because `start_at` defaults to `end`.

Expand Down
1 change: 1 addition & 0 deletions receiver/filelogreceiver/filelog.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func createDefaultConfig() *FileLogConfig {
return &FileLogConfig{
BaseConfig: adapter.BaseConfig{
Operators: []operator.Config{},
Batch: adapter.NewDefaultBatchConfig(),
RetryOnFailure: consumerretry.NewDefaultConfig(),
},
InputConfig: *file.NewConfig(),
Expand Down
2 changes: 2 additions & 0 deletions receiver/journaldreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ Journald receiver is dependent on `journalctl` binary to be present and must be
| `retry_on_failure.initial_interval` | `1 second` | Time to wait after the first failure before retrying. |
| `retry_on_failure.max_interval` | `30 seconds` | Upper bound on retry backoff interval. Once this value is reached the delay between consecutive retries will remain constant at the specified value. |
| `retry_on_failure.max_elapsed_time` | `5 minutes` | Maximum amount of time (including retries) spent trying to send a logs batch to a downstream consumer. Once this value is reached, the data is discarded. Retrying never stops if set to `0`. |
| `batch.max_batch_size` | `100` | Maximum number of log records per emitted batch. |
| `batch.timeout` | `100 milliseconds` | Maximum duration to wait before sending out the log records batch. |

### Example Configurations

Expand Down
1 change: 1 addition & 0 deletions receiver/syslogreceiver/syslog.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func (f ReceiverType) CreateDefaultConfig() component.Config {
return &SysLogConfig{
BaseConfig: adapter.BaseConfig{
Operators: []operator.Config{},
Batch: adapter.NewDefaultBatchConfig(),
RetryOnFailure: consumerretry.NewDefaultConfig(),
},
InputConfig: *syslog.NewConfig(),
Expand Down
24 changes: 13 additions & 11 deletions receiver/tcplogreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,19 @@ Receives logs over TCP.

## Configuration

| Field | Default | Description |
| --- | --- | --- |
| `max_log_size` | `1MiB` | The maximum size of a log entry to read before failing. Protects against reading large amounts of data into memory |
| `listen_address` | required | A listen address of the form `<ip>:<port>` |
| `tls` | nil | An optional `TLS` configuration (see the TLS configuration section) |
| `attributes` | {} | A map of `key: value` pairs to add to the entry's attributes |
| `resource` | {} | A map of `key: value` pairs to add to the entry's resource |
| `add_attributes` | false | Adds `net.*` attributes according to [semantic convention][https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/span-general.md#general-network-connection-attributes] |
| `multiline` | | A `multiline` configuration block. See below for details |
| `encoding` | `utf-8` | The encoding of the file being read. See the list of supported encodings below for available options |
| `operators` | [] | An array of [operators](../../pkg/stanza/docs/operators/README.md#what-operators-are-available). See below for more details |
| Field | Default | Description |
|------------------------|--------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `max_log_size` | `1MiB` | The maximum size of a log entry to read before failing. Protects against reading large amounts of data into memory |
| `listen_address` | required | A listen address of the form `<ip>:<port>` |
| `tls` | nil | An optional `TLS` configuration (see the TLS configuration section) |
| `attributes` | {} | A map of `key: value` pairs to add to the entry's attributes |
| `resource` | {} | A map of `key: value` pairs to add to the entry's resource |
| `add_attributes` | false | Adds `net.*` attributes according to [semantic convention][https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/span-general.md#general-network-connection-attributes] |
| `multiline` | | A `multiline` configuration block. See below for details |
| `encoding` | `utf-8` | The encoding of the file being read. See the list of supported encodings below for available options |
| `operators` | [] | An array of [operators](../../pkg/stanza/docs/operators/README.md#what-operators-are-available). See below for more details |
| `batch.max_batch_size` | `100` | Maximum number of log records per emitted batch. |
| `batch.timeout` | `100 milliseconds` | Maximum duration to wait before sending out the log records batch. |
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
| `batch.timeout` | `100 milliseconds` | Maximum duration to wait before sending out the log records batch. |
| `batch.timeout` | `100ms` | Maximum duration to wait before sending out the log records batch. |


### TLS Configuration

Expand Down
1 change: 1 addition & 0 deletions receiver/tcplogreceiver/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func (f ReceiverType) CreateDefaultConfig() component.Config {
return &TCPLogConfig{
BaseConfig: adapter.BaseConfig{
Operators: []operator.Config{},
Batch: adapter.NewDefaultBatchConfig(),
},
InputConfig: *tcp.NewConfig(),
}
Expand Down
Loading