Skip to content

Commit

Permalink
Merge pull request #1 from cxdy/feat/kafkareceiver-fetch-configurable
Browse files Browse the repository at this point in the history
[receiver/kafkareceiver]: allow tunable fetch sizes
  • Loading branch information
cxdy authored Aug 6, 2024
2 parents 2b34b4e + 8c7e4d8 commit 71cdb84
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 0 deletions.
3 changes: 3 additions & 0 deletions receiver/kafkareceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ The following settings can be optionally configured:
- `initial_offset` (default = latest): The initial offset to use if no offset was previously committed. Must be `latest` or `earliest`.
- `session_timeout` (default = `10s`): The request timeout for detecting client failures when using Kafka’s group management facilities.
- `heartbeat_interval` (default = `3s`): The expected time between heartbeats to the consumer coordinator when using Kafka’s group management facilities.
- `min_fetch_size` (default = `1`): The minimum number of message bytes to fetch in a request, defaults to 1 byte.
- `default_fetch_size` (default = `1048576`): The default number of message bytes to fetch in a request, defaults to 1MB.
- `max_fetch_size` (default = `0`): The maximum number of message bytes to fetch in a request, defaults to unlimited.
- `auth`
- `plain_text`
- `username`: The username to use.
Expand Down
7 changes: 7 additions & 0 deletions receiver/kafkareceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,13 @@ type Config struct {

// Extract headers from kafka records
HeaderExtraction HeaderExtraction `mapstructure:"header_extraction"`

// The minimum bytes per fetch from Kafka (default "1")
MinFetchSize int32 `mapstructure:"min_fetch_size"`
// The default bytes per fetch from Kafka (default "1048576")
DefaultFetchSize int32 `mapstructure:"default_fetch_size"`
// The maximum bytes per fetch from Kafka (default "0")
MaxFetchSize int32 `mapstructure:"max_fetch_size"`
}

const (
Expand Down
6 changes: 6 additions & 0 deletions receiver/kafkareceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ func TestLoadConfig(t *testing.T) {
Enable: true,
Interval: 1 * time.Second,
},
MinFetchSize: 1,
DefaultFetchSize: 1048576,
MaxFetchSize: 0,
},
},
{
Expand Down Expand Up @@ -96,6 +99,9 @@ func TestLoadConfig(t *testing.T) {
Enable: true,
Interval: 1 * time.Second,
},
MinFetchSize: 1,
DefaultFetchSize: 1048576,
MaxFetchSize: 0,
},
},
}
Expand Down
10 changes: 10 additions & 0 deletions receiver/kafkareceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ const (
defaultAutoCommitEnable = true
// default from sarama.NewConfig()
defaultAutoCommitInterval = 1 * time.Second

// default from sarama.NewConfig()
defaultMinFetchSize = 1
// default from sarama.NewConfig()
defaultDefaultFetchSize = 1048576
// default from sarama.NewConfig()
defaultMaxFetchSize = 0
)

var errUnrecognizedEncoding = fmt.Errorf("unrecognized encoding")
Expand Down Expand Up @@ -120,6 +127,9 @@ func createDefaultConfig() component.Config {
HeaderExtraction: HeaderExtraction{
ExtractHeaders: false,
},
MinFetchSize: defaultMinFetchSize,
DefaultFetchSize: defaultDefaultFetchSize,
MaxFetchSize: defaultMaxFetchSize,
}
}

Expand Down
3 changes: 3 additions & 0 deletions receiver/kafkareceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ func TestCreateDefaultConfig(t *testing.T) {
assert.Equal(t, defaultInitialOffset, cfg.InitialOffset)
assert.Equal(t, defaultSessionTimeout, cfg.SessionTimeout)
assert.Equal(t, defaultHeartbeatInterval, cfg.HeartbeatInterval)
assert.Equal(t, defaultMinFetchSize, cfg.MinFetchSize)
assert.Equal(t, defaultDefaultFetchSize, cfg.DefaultFetchSize)
assert.Equal(t, defaultMaxFetchSize, cfg.MaxFetchSize)
}

func TestCreateTracesReceiver(t *testing.T) {
Expand Down
21 changes: 21 additions & 0 deletions receiver/kafkareceiver/kafka_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ type kafkaTracesConsumer struct {
messageMarking MessageMarking
headerExtraction bool
headers []string
minFetchSize int32
defaultFetchSize int32
maxFetchSize int32
}

// kafkaMetricsConsumer uses sarama to consume and handle messages from kafka.
Expand All @@ -66,6 +69,9 @@ type kafkaMetricsConsumer struct {
messageMarking MessageMarking
headerExtraction bool
headers []string
minFetchSize int32
defaultFetchSize int32
maxFetchSize int32
}

// kafkaLogsConsumer uses sarama to consume and handle messages from kafka.
Expand All @@ -84,6 +90,9 @@ type kafkaLogsConsumer struct {
messageMarking MessageMarking
headerExtraction bool
headers []string
minFetchSize int32
defaultFetchSize int32
maxFetchSize int32
}

var _ receiver.Traces = (*kafkaTracesConsumer)(nil)
Expand Down Expand Up @@ -111,6 +120,9 @@ func newTracesReceiver(config Config, set receiver.Settings, unmarshaler TracesU
headerExtraction: config.HeaderExtraction.ExtractHeaders,
headers: config.HeaderExtraction.Headers,
telemetryBuilder: telemetryBuilder,
minFetchSize: config.MinFetchSize,
defaultFetchSize: config.DefaultFetchSize,
maxFetchSize: config.MaxFetchSize,
}, nil
}

Expand All @@ -124,6 +136,9 @@ func createKafkaClient(config Config) (sarama.ConsumerGroup, error) {
saramaConfig.Consumer.Offsets.AutoCommit.Interval = config.AutoCommit.Interval
saramaConfig.Consumer.Group.Session.Timeout = config.SessionTimeout
saramaConfig.Consumer.Group.Heartbeat.Interval = config.HeartbeatInterval
saramaConfig.Consumer.Fetch.Min = config.MinFetchSize
saramaConfig.Consumer.Fetch.Default = config.DefaultFetchSize
saramaConfig.Consumer.Fetch.Max = config.MaxFetchSize

var err error
if saramaConfig.Consumer.Offsets.Initial, err = toSaramaInitialOffset(config.InitialOffset); err != nil {
Expand Down Expand Up @@ -234,6 +249,9 @@ func newMetricsReceiver(config Config, set receiver.Settings, unmarshaler Metric
headerExtraction: config.HeaderExtraction.ExtractHeaders,
headers: config.HeaderExtraction.Headers,
telemetryBuilder: telemetryBuilder,
minFetchSize: config.MinFetchSize,
defaultFetchSize: config.DefaultFetchSize,
maxFetchSize: config.MaxFetchSize,
}, nil
}

Expand Down Expand Up @@ -328,6 +346,9 @@ func newLogsReceiver(config Config, set receiver.Settings, unmarshaler LogsUnmar
headerExtraction: config.HeaderExtraction.ExtractHeaders,
headers: config.HeaderExtraction.Headers,
telemetryBuilder: telemetryBuilder,
minFetchSize: config.MinFetchSize,
defaultFetchSize: config.DefaultFetchSize,
maxFetchSize: config.MaxFetchSize,
}, nil
}

Expand Down

0 comments on commit 71cdb84

Please sign in to comment.