diff --git a/receiver/kafkareceiver/README.md b/receiver/kafkareceiver/README.md index 84dc5a8dcc9e..495fb3b422ae 100644 --- a/receiver/kafkareceiver/README.md +++ b/receiver/kafkareceiver/README.md @@ -45,6 +45,9 @@ The following settings can be optionally configured: - `initial_offset` (default = latest): The initial offset to use if no offset was previously committed. Must be `latest` or `earliest`. - `session_timeout` (default = `10s`): The request timeout for detecting client failures when using Kafka’s group management facilities. - `heartbeat_interval` (default = `3s`): The expected time between heartbeats to the consumer coordinator when using Kafka’s group management facilities. +- `min_fetch_size` (default = `1`): The minimum number of message bytes to fetch in a request, defaults to 1 byte. +- `default_fetch_size` (default = `1048576`): The default number of message bytes to fetch in a request, defaults to 1MB. +- `max_fetch_size` (default = `0`): The maximum number of message bytes to fetch in a request, defaults to unlimited. - `auth` - `plain_text` - `username`: The username to use. diff --git a/receiver/kafkareceiver/config.go b/receiver/kafkareceiver/config.go index 85d72ed3b429..eea308e1199a 100644 --- a/receiver/kafkareceiver/config.go +++ b/receiver/kafkareceiver/config.go @@ -78,6 +78,13 @@ type Config struct { // Extract headers from kafka records HeaderExtraction HeaderExtraction `mapstructure:"header_extraction"` + + // The minimum bytes per fetch from Kafka (default "1") + MinFetchSize int32 `mapstructure:"min_fetch_size"` + // The default bytes per fetch from Kafka (default "1048576") + DefaultFetchSize int32 `mapstructure:"default_fetch_size"` + // The maximum bytes per fetch from Kafka (default "0") + MaxFetchSize int32 `mapstructure:"max_fetch_size"` } const ( diff --git a/receiver/kafkareceiver/config_test.go b/receiver/kafkareceiver/config_test.go index a8fea4b1267f..2eb643b4275e 100644 --- a/receiver/kafkareceiver/config_test.go +++ b/receiver/kafkareceiver/config_test.go @@ -62,6 +62,9 @@ func TestLoadConfig(t *testing.T) { Enable: true, Interval: 1 * time.Second, }, + MinFetchSize: 1, + DefaultFetchSize: 1048576, + MaxFetchSize: 0, }, }, { @@ -96,6 +99,9 @@ func TestLoadConfig(t *testing.T) { Enable: true, Interval: 1 * time.Second, }, + MinFetchSize: 1, + DefaultFetchSize: 1048576, + MaxFetchSize: 0, }, }, } diff --git a/receiver/kafkareceiver/factory.go b/receiver/kafkareceiver/factory.go index ef5cd9610702..7e47a20c8649 100644 --- a/receiver/kafkareceiver/factory.go +++ b/receiver/kafkareceiver/factory.go @@ -40,6 +40,13 @@ const ( defaultAutoCommitEnable = true // default from sarama.NewConfig() defaultAutoCommitInterval = 1 * time.Second + + // default from sarama.NewConfig() + defaultMinFetchSize = 1 + // default from sarama.NewConfig() + defaultDefaultFetchSize = 1048576 + // default from sarama.NewConfig() + defaultMaxFetchSize = 0 ) var errUnrecognizedEncoding = fmt.Errorf("unrecognized encoding") @@ -120,6 +127,9 @@ func createDefaultConfig() component.Config { HeaderExtraction: HeaderExtraction{ ExtractHeaders: false, }, + MinFetchSize: defaultMinFetchSize, + DefaultFetchSize: defaultDefaultFetchSize, + MaxFetchSize: defaultMaxFetchSize, } } diff --git a/receiver/kafkareceiver/factory_test.go b/receiver/kafkareceiver/factory_test.go index e386967abd7d..87c435edba2a 100644 --- a/receiver/kafkareceiver/factory_test.go +++ b/receiver/kafkareceiver/factory_test.go @@ -28,6 +28,9 @@ func TestCreateDefaultConfig(t *testing.T) { assert.Equal(t, defaultInitialOffset, cfg.InitialOffset) assert.Equal(t, defaultSessionTimeout, cfg.SessionTimeout) assert.Equal(t, defaultHeartbeatInterval, cfg.HeartbeatInterval) + assert.Equal(t, defaultMinFetchSize, cfg.MinFetchSize) + assert.Equal(t, defaultDefaultFetchSize, cfg.DefaultFetchSize) + assert.Equal(t, defaultMaxFetchSize, cfg.MaxFetchSize) } func TestCreateTracesReceiver(t *testing.T) { diff --git a/receiver/kafkareceiver/kafka_receiver.go b/receiver/kafkareceiver/kafka_receiver.go index c9c7848fdbd6..b11c883fef71 100644 --- a/receiver/kafkareceiver/kafka_receiver.go +++ b/receiver/kafkareceiver/kafka_receiver.go @@ -48,6 +48,9 @@ type kafkaTracesConsumer struct { messageMarking MessageMarking headerExtraction bool headers []string + minFetchSize int32 + defaultFetchSize int32 + maxFetchSize int32 } // kafkaMetricsConsumer uses sarama to consume and handle messages from kafka. @@ -66,6 +69,9 @@ type kafkaMetricsConsumer struct { messageMarking MessageMarking headerExtraction bool headers []string + minFetchSize int32 + defaultFetchSize int32 + maxFetchSize int32 } // kafkaLogsConsumer uses sarama to consume and handle messages from kafka. @@ -84,6 +90,9 @@ type kafkaLogsConsumer struct { messageMarking MessageMarking headerExtraction bool headers []string + minFetchSize int32 + defaultFetchSize int32 + maxFetchSize int32 } var _ receiver.Traces = (*kafkaTracesConsumer)(nil) @@ -111,6 +120,9 @@ func newTracesReceiver(config Config, set receiver.Settings, unmarshaler TracesU headerExtraction: config.HeaderExtraction.ExtractHeaders, headers: config.HeaderExtraction.Headers, telemetryBuilder: telemetryBuilder, + minFetchSize: config.MinFetchSize, + defaultFetchSize: config.DefaultFetchSize, + maxFetchSize: config.MaxFetchSize, }, nil } @@ -124,6 +136,9 @@ func createKafkaClient(config Config) (sarama.ConsumerGroup, error) { saramaConfig.Consumer.Offsets.AutoCommit.Interval = config.AutoCommit.Interval saramaConfig.Consumer.Group.Session.Timeout = config.SessionTimeout saramaConfig.Consumer.Group.Heartbeat.Interval = config.HeartbeatInterval + saramaConfig.Consumer.Fetch.Min = config.MinFetchSize + saramaConfig.Consumer.Fetch.Default = config.DefaultFetchSize + saramaConfig.Consumer.Fetch.Max = config.MaxFetchSize var err error if saramaConfig.Consumer.Offsets.Initial, err = toSaramaInitialOffset(config.InitialOffset); err != nil { @@ -234,6 +249,9 @@ func newMetricsReceiver(config Config, set receiver.Settings, unmarshaler Metric headerExtraction: config.HeaderExtraction.ExtractHeaders, headers: config.HeaderExtraction.Headers, telemetryBuilder: telemetryBuilder, + minFetchSize: config.MinFetchSize, + defaultFetchSize: config.DefaultFetchSize, + maxFetchSize: config.MaxFetchSize, }, nil } @@ -328,6 +346,9 @@ func newLogsReceiver(config Config, set receiver.Settings, unmarshaler LogsUnmar headerExtraction: config.HeaderExtraction.ExtractHeaders, headers: config.HeaderExtraction.Headers, telemetryBuilder: telemetryBuilder, + minFetchSize: config.MinFetchSize, + defaultFetchSize: config.DefaultFetchSize, + maxFetchSize: config.MaxFetchSize, }, nil }