Skip to content

Commit

Permalink
feat: fluentd output plugin kafka support rdkafka2 client
Browse files Browse the repository at this point in the history
Signed-off-by: Onecer <[email protected]>
  • Loading branch information
onecer committed Jul 17, 2024
1 parent 2f2d757 commit d81ee11
Show file tree
Hide file tree
Showing 8 changed files with 82 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3424,6 +3424,8 @@ spec:
type: string
use_default_for_unknown_topic:
type: boolean
use_rdkafka:
type: boolean
username:
properties:
mountFrom:
Expand Down Expand Up @@ -10821,6 +10823,8 @@ spec:
type: string
use_default_for_unknown_topic:
type: boolean
use_rdkafka:
type: boolean
username:
properties:
mountFrom:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3420,6 +3420,8 @@ spec:
type: string
use_default_for_unknown_topic:
type: boolean
use_rdkafka:
type: boolean
username:
properties:
mountFrom:
Expand Down Expand Up @@ -10091,6 +10093,8 @@ spec:
type: string
use_default_for_unknown_topic:
type: boolean
use_rdkafka:
type: boolean
username:
properties:
mountFrom:
Expand Down
4 changes: 4 additions & 0 deletions config/crd/bases/logging.banzaicloud.io_clusteroutputs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3424,6 +3424,8 @@ spec:
type: string
use_default_for_unknown_topic:
type: boolean
use_rdkafka:
type: boolean
username:
properties:
mountFrom:
Expand Down Expand Up @@ -10821,6 +10823,8 @@ spec:
type: string
use_default_for_unknown_topic:
type: boolean
use_rdkafka:
type: boolean
username:
properties:
mountFrom:
Expand Down
4 changes: 4 additions & 0 deletions config/crd/bases/logging.banzaicloud.io_outputs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3420,6 +3420,8 @@ spec:
type: string
use_default_for_unknown_topic:
type: boolean
use_rdkafka:
type: boolean
username:
properties:
mountFrom:
Expand Down Expand Up @@ -10091,6 +10093,8 @@ spec:
type: string
use_default_for_unknown_topic:
type: boolean
use_rdkafka:
type: boolean
username:
properties:
mountFrom:
Expand Down
11 changes: 9 additions & 2 deletions docs/configuration/plugins/outputs/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ spec:
## Configuration
## Kafka
Send your logs to Kafka
Send your logs to Kafka.
Setting use_rdkafka to true opts for rdkafka2, which offers higher performance compared to ruby-kafka.
-[more info](https://github.com/fluent/fluent-plugin-kafka#output-plugin)
### ack_timeout (int, optional) {#kafka-ack_timeout}
Expand Down Expand Up @@ -212,7 +214,7 @@ Client certificate key
Verify certificate hostname
### sasl_over_ssl (bool, required) {#kafka-sasl_over_ssl}
### sasl_over_ssl (*bool, optional) {#kafka-sasl_over_ssl}
SASL over SSL
Expand Down Expand Up @@ -240,6 +242,11 @@ Use default for unknown topics

Default: false

### use_rdkafka (bool, optional) {#kafka-use_rdkafka}

Use rdkafka of the output plugin.


### username (*secret.Secret, optional) {#kafka-username}

Username when using PLAIN/SCRAM SASL authentication
Expand Down
17 changes: 13 additions & 4 deletions pkg/sdk/logging/model/output/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,12 @@ type _metaKafka interface{} //nolint:deadcode,unused

// +kubebuilder:object:generate=true
// +docName:"Kafka"
// Send your logs to Kafka
// Send your logs to Kafka.
// Setting use_rdkafka to true opts for rdkafka2, which offers higher performance compared to ruby-kafka.
// -[more info](https://github.com/fluent/fluent-plugin-kafka#output-plugin)
type KafkaOutputConfig struct {

// Use rdkafka of the output plugin.
UseRdkafka bool `json:"use_rdkafka,omitempty"`
// The list of all seed brokers, with their host and port information.
Brokers string `json:"brokers"`
// Topic Key (default: "topic")
Expand Down Expand Up @@ -95,7 +98,7 @@ type KafkaOutputConfig struct {
Idempotent bool `json:"idempotent,omitempty"`
// SASL over SSL (default: true)
// +kubebuilder:validation:Optional
SaslOverSSL bool `json:"sasl_over_ssl"`
SaslOverSSL *bool `json:"sasl_over_ssl,omitempty"`
Principal string `json:"principal,omitempty"`
Keytab *secret.Secret `json:"keytab,omitempty"`
// Username when using PLAIN/SCRAM SASL authentication
Expand Down Expand Up @@ -141,7 +144,10 @@ type KafkaOutputConfig struct {
}

func (e *KafkaOutputConfig) ToDirective(secretLoader secret.SecretLoader, id string) (types.Directive, error) {
const pluginType = "kafka2"
pluginType := "kafka2"
if e.UseRdkafka {
pluginType = "rdkafka2"
}
kafka := &types.OutputPlugin{
PluginMeta: types.PluginMeta{
Type: pluginType,
Expand Down Expand Up @@ -171,5 +177,8 @@ func (e *KafkaOutputConfig) ToDirective(secretLoader secret.SecretLoader, id str
kafka.SubDirectives = append(kafka.SubDirectives, format)
}
}

// remove use_rdkafka from params, it is not a valid parameter for plugin config
delete(kafka.Params, "use_rdkafka")
return kafka, nil
}
39 changes: 39 additions & 0 deletions pkg/sdk/logging/model/output/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,42 @@ buffer:
test := render.NewOutputPluginTest(t, kafka)
test.DiffResult(expected)
}

func TestRdkafka(t *testing.T) {
CONFIG := []byte(`
brokers: kafka-headless.kafka.svc.cluster.local:29092
default_topic: topic
use_rdkafka: true
ssl_verify_hostname: false
format:
type: json
buffer:
timekey: 1m
timekey_wait: 30s
timekey_use_utc: true
`)
expected := `
<match **>
@type rdkafka2
@id test
brokers kafka-headless.kafka.svc.cluster.local:29092
default_topic topic
ssl_verify_hostname false
<buffer tag,time>
@type file
path /buffers/test.*.buffer
retry_forever true
timekey 1m
timekey_use_utc true
timekey_wait 30s
</buffer>
<format>
@type json
</format>
</match>
`
kafka := &output.KafkaOutputConfig{}
require.NoError(t, yaml.Unmarshal(CONFIG, kafka))
test := render.NewOutputPluginTest(t, kafka)
test.DiffResult(expected)
}
5 changes: 5 additions & 0 deletions pkg/sdk/logging/model/output/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit d81ee11

Please sign in to comment.