From d81ee11b460e1f8a37d41f16c058a66ce7b95fa1 Mon Sep 17 00:00:00 2001 From: Onecer Date: Wed, 10 Jul 2024 14:50:15 +0800 Subject: [PATCH] feat: fluentd output plugin kafka support rdkafka2 client Signed-off-by: Onecer --- ...logging.banzaicloud.io_clusteroutputs.yaml | 4 ++ .../crds/logging.banzaicloud.io_outputs.yaml | 4 ++ ...logging.banzaicloud.io_clusteroutputs.yaml | 4 ++ .../bases/logging.banzaicloud.io_outputs.yaml | 4 ++ docs/configuration/plugins/outputs/kafka.md | 11 +++++- pkg/sdk/logging/model/output/kafka.go | 17 ++++++-- pkg/sdk/logging/model/output/kafka_test.go | 39 +++++++++++++++++++ .../model/output/zz_generated.deepcopy.go | 5 +++ 8 files changed, 82 insertions(+), 6 deletions(-) diff --git a/charts/logging-operator/crds/logging.banzaicloud.io_clusteroutputs.yaml b/charts/logging-operator/crds/logging.banzaicloud.io_clusteroutputs.yaml index 4a5f05e03..9179a1e0f 100644 --- a/charts/logging-operator/crds/logging.banzaicloud.io_clusteroutputs.yaml +++ b/charts/logging-operator/crds/logging.banzaicloud.io_clusteroutputs.yaml @@ -3424,6 +3424,8 @@ spec: type: string use_default_for_unknown_topic: type: boolean + use_rdkafka: + type: boolean username: properties: mountFrom: @@ -10821,6 +10823,8 @@ spec: type: string use_default_for_unknown_topic: type: boolean + use_rdkafka: + type: boolean username: properties: mountFrom: diff --git a/charts/logging-operator/crds/logging.banzaicloud.io_outputs.yaml b/charts/logging-operator/crds/logging.banzaicloud.io_outputs.yaml index 222995229..7f24ee24f 100644 --- a/charts/logging-operator/crds/logging.banzaicloud.io_outputs.yaml +++ b/charts/logging-operator/crds/logging.banzaicloud.io_outputs.yaml @@ -3420,6 +3420,8 @@ spec: type: string use_default_for_unknown_topic: type: boolean + use_rdkafka: + type: boolean username: properties: mountFrom: @@ -10091,6 +10093,8 @@ spec: type: string use_default_for_unknown_topic: type: boolean + use_rdkafka: + type: boolean username: properties: mountFrom: diff --git a/config/crd/bases/logging.banzaicloud.io_clusteroutputs.yaml b/config/crd/bases/logging.banzaicloud.io_clusteroutputs.yaml index 4a5f05e03..9179a1e0f 100644 --- a/config/crd/bases/logging.banzaicloud.io_clusteroutputs.yaml +++ b/config/crd/bases/logging.banzaicloud.io_clusteroutputs.yaml @@ -3424,6 +3424,8 @@ spec: type: string use_default_for_unknown_topic: type: boolean + use_rdkafka: + type: boolean username: properties: mountFrom: @@ -10821,6 +10823,8 @@ spec: type: string use_default_for_unknown_topic: type: boolean + use_rdkafka: + type: boolean username: properties: mountFrom: diff --git a/config/crd/bases/logging.banzaicloud.io_outputs.yaml b/config/crd/bases/logging.banzaicloud.io_outputs.yaml index 222995229..7f24ee24f 100644 --- a/config/crd/bases/logging.banzaicloud.io_outputs.yaml +++ b/config/crd/bases/logging.banzaicloud.io_outputs.yaml @@ -3420,6 +3420,8 @@ spec: type: string use_default_for_unknown_topic: type: boolean + use_rdkafka: + type: boolean username: properties: mountFrom: @@ -10091,6 +10093,8 @@ spec: type: string use_default_for_unknown_topic: type: boolean + use_rdkafka: + type: boolean username: properties: mountFrom: diff --git a/docs/configuration/plugins/outputs/kafka.md b/docs/configuration/plugins/outputs/kafka.md index 9ca2e115e..ba6b81656 100644 --- a/docs/configuration/plugins/outputs/kafka.md +++ b/docs/configuration/plugins/outputs/kafka.md @@ -33,7 +33,9 @@ spec: ## Configuration ## Kafka -Send your logs to Kafka +Send your logs to Kafka. +Setting use_rdkafka to true opts for rdkafka2, which offers higher performance compared to ruby-kafka. +-[more info](https://github.com/fluent/fluent-plugin-kafka#output-plugin) ### ack_timeout (int, optional) {#kafka-ack_timeout} @@ -212,7 +214,7 @@ Client certificate key Verify certificate hostname -### sasl_over_ssl (bool, required) {#kafka-sasl_over_ssl} +### sasl_over_ssl (*bool, optional) {#kafka-sasl_over_ssl} SASL over SSL @@ -240,6 +242,11 @@ Use default for unknown topics Default: false +### use_rdkafka (bool, optional) {#kafka-use_rdkafka} + +Use rdkafka of the output plugin. + + ### username (*secret.Secret, optional) {#kafka-username} Username when using PLAIN/SCRAM SASL authentication diff --git a/pkg/sdk/logging/model/output/kafka.go b/pkg/sdk/logging/model/output/kafka.go index 732cea94d..475234e68 100644 --- a/pkg/sdk/logging/model/output/kafka.go +++ b/pkg/sdk/logging/model/output/kafka.go @@ -58,9 +58,12 @@ type _metaKafka interface{} //nolint:deadcode,unused // +kubebuilder:object:generate=true // +docName:"Kafka" -// Send your logs to Kafka +// Send your logs to Kafka. +// Setting use_rdkafka to true opts for rdkafka2, which offers higher performance compared to ruby-kafka. +// -[more info](https://github.com/fluent/fluent-plugin-kafka#output-plugin) type KafkaOutputConfig struct { - + // Use rdkafka of the output plugin. + UseRdkafka bool `json:"use_rdkafka,omitempty"` // The list of all seed brokers, with their host and port information. Brokers string `json:"brokers"` // Topic Key (default: "topic") @@ -95,7 +98,7 @@ type KafkaOutputConfig struct { Idempotent bool `json:"idempotent,omitempty"` // SASL over SSL (default: true) // +kubebuilder:validation:Optional - SaslOverSSL bool `json:"sasl_over_ssl"` + SaslOverSSL *bool `json:"sasl_over_ssl,omitempty"` Principal string `json:"principal,omitempty"` Keytab *secret.Secret `json:"keytab,omitempty"` // Username when using PLAIN/SCRAM SASL authentication @@ -141,7 +144,10 @@ type KafkaOutputConfig struct { } func (e *KafkaOutputConfig) ToDirective(secretLoader secret.SecretLoader, id string) (types.Directive, error) { - const pluginType = "kafka2" + pluginType := "kafka2" + if e.UseRdkafka { + pluginType = "rdkafka2" + } kafka := &types.OutputPlugin{ PluginMeta: types.PluginMeta{ Type: pluginType, @@ -171,5 +177,8 @@ func (e *KafkaOutputConfig) ToDirective(secretLoader secret.SecretLoader, id str kafka.SubDirectives = append(kafka.SubDirectives, format) } } + + // remove use_rdkafka from params, it is not a valid parameter for plugin config + delete(kafka.Params, "use_rdkafka") return kafka, nil } diff --git a/pkg/sdk/logging/model/output/kafka_test.go b/pkg/sdk/logging/model/output/kafka_test.go index f22a0f6ee..c49328736 100644 --- a/pkg/sdk/logging/model/output/kafka_test.go +++ b/pkg/sdk/logging/model/output/kafka_test.go @@ -62,3 +62,42 @@ buffer: test := render.NewOutputPluginTest(t, kafka) test.DiffResult(expected) } + +func TestRdkafka(t *testing.T) { + CONFIG := []byte(` +brokers: kafka-headless.kafka.svc.cluster.local:29092 +default_topic: topic +use_rdkafka: true +ssl_verify_hostname: false +format: + type: json +buffer: + timekey: 1m + timekey_wait: 30s + timekey_use_utc: true +`) + expected := ` + + @type rdkafka2 + @id test + brokers kafka-headless.kafka.svc.cluster.local:29092 + default_topic topic + ssl_verify_hostname false + + @type file + path /buffers/test.*.buffer + retry_forever true + timekey 1m + timekey_use_utc true + timekey_wait 30s + + + @type json + + +` + kafka := &output.KafkaOutputConfig{} + require.NoError(t, yaml.Unmarshal(CONFIG, kafka)) + test := render.NewOutputPluginTest(t, kafka) + test.DiffResult(expected) +} diff --git a/pkg/sdk/logging/model/output/zz_generated.deepcopy.go b/pkg/sdk/logging/model/output/zz_generated.deepcopy.go index df5cd8a0a..6c0ebc29d 100644 --- a/pkg/sdk/logging/model/output/zz_generated.deepcopy.go +++ b/pkg/sdk/logging/model/output/zz_generated.deepcopy.go @@ -706,6 +706,11 @@ func (in *KafkaOutputConfig) DeepCopyInto(out *KafkaOutputConfig) { (*out)[key] = val } } + if in.SaslOverSSL != nil { + in, out := &in.SaslOverSSL, &out.SaslOverSSL + *out = new(bool) + **out = **in + } if in.Keytab != nil { in, out := &in.Keytab, &out.Keytab *out = new(secret.Secret)