From 6dd76110a60cac2961a06eb5476843723cc8ab83 Mon Sep 17 00:00:00 2001 From: WalkerWang731 Date: Tue, 12 Jan 2021 18:51:10 +0800 Subject: [PATCH 1/4] add that suppot Kafka SASL/PLAIN authentication of SCRAM-SHA-256 or SCRAM-SHA-512 mechanism Signed-off-by: WalkerWang731 --- cmd/ingester/app/flags_test.go | 11 ++--- go.mod | 1 + go.sum | 2 + pkg/kafka/auth/config.go | 6 ++- pkg/kafka/auth/options.go | 16 ++++--- pkg/kafka/auth/plaintext.go | 64 ++++++++++++++++++++++++++-- plugin/storage/kafka/factory_test.go | 1 + plugin/storage/kafka/options_test.go | 11 ++--- 8 files changed, 93 insertions(+), 19 deletions(-) diff --git a/cmd/ingester/app/flags_test.go b/cmd/ingester/app/flags_test.go index 93be0d13f54..8523af1ee3e 100644 --- a/cmd/ingester/app/flags_test.go +++ b/cmd/ingester/app/flags_test.go @@ -55,29 +55,30 @@ func TestOptionsWithFlags(t *testing.T) { func TestTLSFlags(t *testing.T) { kerb := auth.KerberosConfig{ServiceName: "kafka", ConfigPath: "/etc/krb5.conf", KeyTabPath: "/etc/security/kafka.keytab"} + plain := auth.PlainTextConfig{UserName: "", Password: "", Mechanism: "PLAIN"} tests := []struct { flags []string expected auth.AuthenticationConfig }{ { flags: []string{}, - expected: auth.AuthenticationConfig{Authentication: "none", Kerberos: kerb}, + expected: auth.AuthenticationConfig{Authentication: "none", Kerberos: kerb, PlainText: plain}, }, { flags: []string{"--kafka.consumer.authentication=foo"}, - expected: auth.AuthenticationConfig{Authentication: "foo", Kerberos: kerb}, + expected: auth.AuthenticationConfig{Authentication: "foo", Kerberos: kerb, PlainText: plain}, }, { flags: []string{"--kafka.consumer.authentication=kerberos", "--kafka.consumer.tls.enabled=true"}, - expected: auth.AuthenticationConfig{Authentication: "kerberos", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}}, + expected: auth.AuthenticationConfig{Authentication: "kerberos", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}, PlainText: plain}, }, { flags: []string{"--kafka.consumer.authentication=tls"}, - expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}}, + expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}, PlainText: plain}, }, { flags: []string{"--kafka.consumer.authentication=tls", "--kafka.consumer.tls.enabled=false"}, - expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}}, + expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}, PlainText: plain}, }, } diff --git a/go.mod b/go.mod index 5c6299e7588..f099265a018 100644 --- a/go.mod +++ b/go.mod @@ -67,6 +67,7 @@ require ( github.com/uber/jaeger-lib v2.4.0+incompatible github.com/vektra/mockery v0.0.0-20181123154057-e78b021dcbb5 github.com/wadey/gocovmerge v0.0.0-20160331181800-b5bfa59ec0ad + github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c go.mongodb.org/mongo-driver v1.3.2 // indirect go.uber.org/atomic v1.6.0 go.uber.org/automaxprocs v1.3.0 diff --git a/go.sum b/go.sum index 7317ca9694d..67bd1db2f53 100644 --- a/go.sum +++ b/go.sum @@ -587,8 +587,10 @@ github.com/vektra/mockery v0.0.0-20181123154057-e78b021dcbb5 h1:Xim2mBRFdXzXmKRO github.com/vektra/mockery v0.0.0-20181123154057-e78b021dcbb5/go.mod h1:ppEjwdhyy7Y31EnHRDm1JkChoC7LXIJ7Ex0VYLWtZtQ= github.com/wadey/gocovmerge v0.0.0-20160331181800-b5bfa59ec0ad h1:W0LEBv82YCGEtcmPA3uNZBI33/qF//HAAs3MawDjRa0= github.com/wadey/gocovmerge v0.0.0-20160331181800-b5bfa59ec0ad/go.mod h1:Hy8o65+MXnS6EwGElrSRjUzQDLXreJlzYLlWiHtt8hM= +github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk= github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= github.com/xdg/stringprep v0.0.0-20180714160509-73f8eece6fdc/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= +github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0= github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= diff --git a/pkg/kafka/auth/config.go b/pkg/kafka/auth/config.go index c9e42e0c0a5..34f629e3904 100644 --- a/pkg/kafka/auth/config.go +++ b/pkg/kafka/auth/config.go @@ -68,7 +68,10 @@ func (config *AuthenticationConfig) SetConfiguration(saramaConfig *sarama.Config setKerberosConfiguration(&config.Kerberos, saramaConfig) return nil case plaintext: - setPlainTextConfiguration(&config.PlainText, saramaConfig) + err := setPlainTextConfiguration(&config.PlainText, saramaConfig) + if err != nil { + return err + } return nil default: return fmt.Errorf("Unknown/Unsupported authentication method %s to kafka cluster", config.Authentication) @@ -99,4 +102,5 @@ func (config *AuthenticationConfig) InitFromViper(configPrefix string, v *viper. config.PlainText.UserName = v.GetString(configPrefix + plainTextPrefix + suffixPlainTextUserName) config.PlainText.Password = v.GetString(configPrefix + plainTextPrefix + suffixPlainTextPassword) + config.PlainText.Mechanism = v.GetString(configPrefix + plainTextPrefix + suffixPlainTextMechanism) } diff --git a/pkg/kafka/auth/options.go b/pkg/kafka/auth/options.go index e2e7edb4804..d38efe83aa0 100644 --- a/pkg/kafka/auth/options.go +++ b/pkg/kafka/auth/options.go @@ -43,12 +43,14 @@ const ( defaultKerberosUsername = "" defaultKerberosKeyTab = "/etc/security/kafka.keytab" - plainTextPrefix = ".plaintext" - suffixPlainTextUserName = ".username" - suffixPlainTextPassword = ".password" + plainTextPrefix = ".plaintext" + suffixPlainTextUserName = ".username" + suffixPlainTextPassword = ".password" + suffixPlainTextMechanism = ".mechanism" - defaultPlainTextUserName = "" - defaultPlainTextPassword = "" + defaultPlainTextUserName = "" + defaultPlainTextPassword = "" + defaultPlainTextMechanism = "PLAIN" ) func addKerberosFlags(configPrefix string, flagSet *flag.FlagSet) { @@ -91,6 +93,10 @@ func addPlainTextFlags(configPrefix string, flagSet *flag.FlagSet) { configPrefix+plainTextPrefix+suffixPlainTextPassword, defaultPlainTextPassword, "The plaintext Password for SASL/PLAIN authentication") + flagSet.String( + configPrefix+plainTextPrefix+suffixPlainTextMechanism, + defaultPlainTextMechanism, + "The plaintext Mechanism for SASL/PLAIN authentication, e.g. 'SCRAM-SHA-256' or 'SCRAM-SHA-512' or 'PLAIN'") } // AddFlags add configuration flags to a flagSet. diff --git a/pkg/kafka/auth/plaintext.go b/pkg/kafka/auth/plaintext.go index 277d5d3c58f..e8f10cdfda5 100644 --- a/pkg/kafka/auth/plaintext.go +++ b/pkg/kafka/auth/plaintext.go @@ -15,17 +15,75 @@ package auth import ( + "crypto/sha256" + "crypto/sha512" + "fmt" + "hash" + "strings" + "github.com/Shopify/sarama" + "github.com/xdg/scram" ) +// XDGSCRAMClient is return a *sarama.SCRAMClient on create SCRAMClientGeneratorFunc when the mechanism is SCRAM-SHA-256 or SCRAM-SHA-512 +type XDGSCRAMClient struct { + *scram.Client + *scram.ClientConversation + scram.HashGeneratorFcn +} + +// Begin prepares the client for the SCRAM exchange +// with the server with a user name and a password +func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) { + x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID) + if err != nil { + return err + } + x.ClientConversation = x.Client.NewConversation() + return nil +} + +// Step steps client through the SCRAM exchange. It is +// called repeatedly until it errors or `Done` returns true. +func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) { + response, err = x.ClientConversation.Step(challenge) + return +} + +// Done should return true when the SCRAM conversation +// is over. +func (x *XDGSCRAMClient) Done() bool { + return x.ClientConversation.Done() +} + // PlainTextConfig describes the configuration properties needed for SASL/PLAIN with kafka type PlainTextConfig struct { - UserName string `mapstructure:"username"` - Password string `mapstructure:"password" json:"-"` + UserName string `mapstructure:"username"` + Password string `mapstructure:"password" json:"-"` + Mechanism string `mapstructure:"mechanism"` } -func setPlainTextConfiguration(config *PlainTextConfig, saramaConfig *sarama.Config) { +func setPlainTextConfiguration(config *PlainTextConfig, saramaConfig *sarama.Config) error { saramaConfig.Net.SASL.Enable = true saramaConfig.Net.SASL.User = config.UserName saramaConfig.Net.SASL.Password = config.Password + switch strings.ToUpper(config.Mechanism) { + case "SCRAM-SHA-256": + saramaConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { + return &XDGSCRAMClient{HashGeneratorFcn: func() hash.Hash { return sha256.New() }} + } + saramaConfig.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA256 + case "SCRAM-SHA-512": + saramaConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { + return &XDGSCRAMClient{HashGeneratorFcn: func() hash.Hash { return sha512.New() }} + } + saramaConfig.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512 + case "PLAIN": + saramaConfig.Net.SASL.Mechanism = sarama.SASLTypePlaintext + + default: + return fmt.Errorf("config plaintext.mechanism error: %s, only support 'SCRAM-SHA-256' or 'SCRAM-SHA-512' or 'PLAIN'", config.Mechanism) + + } + return nil } diff --git a/plugin/storage/kafka/factory_test.go b/plugin/storage/kafka/factory_test.go index e6538f7c278..891652e22bd 100644 --- a/plugin/storage/kafka/factory_test.go +++ b/plugin/storage/kafka/factory_test.go @@ -120,6 +120,7 @@ func TestKafkaFactoryDoesNotLogPassword(t *testing.T) { "--kafka.producer.authentication=plaintext", "--kafka.producer.plaintext.username=username", "--kafka.producer.plaintext.password=SECRET", + "--kafka.producer.plaintext.mechanism=PLAINT", "--kafka.producer.brokers=localhost:9092", }, }, diff --git a/plugin/storage/kafka/options_test.go b/plugin/storage/kafka/options_test.go index 5fa6f249301..b01ff66ce40 100644 --- a/plugin/storage/kafka/options_test.go +++ b/plugin/storage/kafka/options_test.go @@ -173,29 +173,30 @@ func TestRequiredAcksFailures(t *testing.T) { func TestTLSFlags(t *testing.T) { kerb := auth.KerberosConfig{ServiceName: "kafka", ConfigPath: "/etc/krb5.conf", KeyTabPath: "/etc/security/kafka.keytab"} + plain := auth.PlainTextConfig{UserName: "", Password: "", Mechanism: "PLAIN"} tests := []struct { flags []string expected auth.AuthenticationConfig }{ { flags: []string{}, - expected: auth.AuthenticationConfig{Authentication: "none", Kerberos: kerb}, + expected: auth.AuthenticationConfig{Authentication: "none", Kerberos: kerb, PlainText: plain}, }, { flags: []string{"--kafka.producer.authentication=foo"}, - expected: auth.AuthenticationConfig{Authentication: "foo", Kerberos: kerb}, + expected: auth.AuthenticationConfig{Authentication: "foo", Kerberos: kerb, PlainText: plain}, }, { flags: []string{"--kafka.producer.authentication=kerberos", "--kafka.producer.tls.enabled=true"}, - expected: auth.AuthenticationConfig{Authentication: "kerberos", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}}, + expected: auth.AuthenticationConfig{Authentication: "kerberos", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}, PlainText: plain}, }, { flags: []string{"--kafka.producer.authentication=tls"}, - expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}}, + expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}, PlainText: plain}, }, { flags: []string{"--kafka.producer.authentication=tls", "--kafka.producer.tls.enabled=false"}, - expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}}, + expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}, PlainText: plain}, }, } From 2d41e7cbffd701370395c511428afff44718910f Mon Sep 17 00:00:00 2001 From: WalkerWang731 Date: Wed, 13 Jan 2021 14:16:36 +0800 Subject: [PATCH 2/4] rename XDGSCRAMClient to scramClient and remove paramater that no point on factory_test.go Signed-off-by: WalkerWang731 --- pkg/kafka/auth/plaintext.go | 14 +++++++------- plugin/storage/kafka/factory_test.go | 1 - 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/pkg/kafka/auth/plaintext.go b/pkg/kafka/auth/plaintext.go index e8f10cdfda5..4b797afd9e4 100644 --- a/pkg/kafka/auth/plaintext.go +++ b/pkg/kafka/auth/plaintext.go @@ -25,8 +25,8 @@ import ( "github.com/xdg/scram" ) -// XDGSCRAMClient is return a *sarama.SCRAMClient on create SCRAMClientGeneratorFunc when the mechanism is SCRAM-SHA-256 or SCRAM-SHA-512 -type XDGSCRAMClient struct { +// scramClient is the client to use when the auth mechanism is SCRAM +type scramClient struct { *scram.Client *scram.ClientConversation scram.HashGeneratorFcn @@ -34,7 +34,7 @@ type XDGSCRAMClient struct { // Begin prepares the client for the SCRAM exchange // with the server with a user name and a password -func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) { +func (x *scramClient) Begin(userName, password, authzID string) (err error) { x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID) if err != nil { return err @@ -45,14 +45,14 @@ func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) { // Step steps client through the SCRAM exchange. It is // called repeatedly until it errors or `Done` returns true. -func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) { +func (x *scramClient) Step(challenge string) (response string, err error) { response, err = x.ClientConversation.Step(challenge) return } // Done should return true when the SCRAM conversation // is over. -func (x *XDGSCRAMClient) Done() bool { +func (x *scramClient) Done() bool { return x.ClientConversation.Done() } @@ -70,12 +70,12 @@ func setPlainTextConfiguration(config *PlainTextConfig, saramaConfig *sarama.Con switch strings.ToUpper(config.Mechanism) { case "SCRAM-SHA-256": saramaConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { - return &XDGSCRAMClient{HashGeneratorFcn: func() hash.Hash { return sha256.New() }} + return &scramClient{HashGeneratorFcn: func() hash.Hash { return sha256.New() }} } saramaConfig.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA256 case "SCRAM-SHA-512": saramaConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { - return &XDGSCRAMClient{HashGeneratorFcn: func() hash.Hash { return sha512.New() }} + return &scramClient{HashGeneratorFcn: func() hash.Hash { return sha512.New() }} } saramaConfig.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512 case "PLAIN": diff --git a/plugin/storage/kafka/factory_test.go b/plugin/storage/kafka/factory_test.go index 891652e22bd..e6538f7c278 100644 --- a/plugin/storage/kafka/factory_test.go +++ b/plugin/storage/kafka/factory_test.go @@ -120,7 +120,6 @@ func TestKafkaFactoryDoesNotLogPassword(t *testing.T) { "--kafka.producer.authentication=plaintext", "--kafka.producer.plaintext.username=username", "--kafka.producer.plaintext.password=SECRET", - "--kafka.producer.plaintext.mechanism=PLAINT", "--kafka.producer.brokers=localhost:9092", }, }, From 2c00b3da52356c7ba24124a406fa20dffdd31a01 Mon Sep 17 00:00:00 2001 From: WalkerWang731 Date: Thu, 14 Jan 2021 22:41:36 +0800 Subject: [PATCH 3/4] add type assertion Signed-off-by: WalkerWang731 --- pkg/kafka/auth/plaintext.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/kafka/auth/plaintext.go b/pkg/kafka/auth/plaintext.go index 4b797afd9e4..a9542cb3d0b 100644 --- a/pkg/kafka/auth/plaintext.go +++ b/pkg/kafka/auth/plaintext.go @@ -63,6 +63,8 @@ type PlainTextConfig struct { Mechanism string `mapstructure:"mechanism"` } +var _ sarama.SCRAMClient = (*scramClient)(nil) + func setPlainTextConfiguration(config *PlainTextConfig, saramaConfig *sarama.Config) error { saramaConfig.Net.SASL.Enable = true saramaConfig.Net.SASL.User = config.UserName @@ -86,4 +88,4 @@ func setPlainTextConfiguration(config *PlainTextConfig, saramaConfig *sarama.Con } return nil -} +} \ No newline at end of file From 5d19011b517a772b72d58f36cff15fff6adab195 Mon Sep 17 00:00:00 2001 From: WalkerWang731 Date: Thu, 14 Jan 2021 23:01:45 +0800 Subject: [PATCH 4/4] replacement UserName to Username Signed-off-by: WalkerWang731 --- cmd/ingester/app/flags_test.go | 2 +- .../app/exporter/kafkaexporter/kafka_exporter.go | 2 +- .../app/receiver/kafkareceiver/kafka_receiver.go | 2 +- pkg/kafka/auth/config.go | 4 ++-- pkg/kafka/auth/options.go | 12 ++++++------ pkg/kafka/auth/plaintext.go | 6 +++--- plugin/storage/kafka/options_test.go | 2 +- 7 files changed, 15 insertions(+), 15 deletions(-) diff --git a/cmd/ingester/app/flags_test.go b/cmd/ingester/app/flags_test.go index 8523af1ee3e..5659dbc4bc4 100644 --- a/cmd/ingester/app/flags_test.go +++ b/cmd/ingester/app/flags_test.go @@ -55,7 +55,7 @@ func TestOptionsWithFlags(t *testing.T) { func TestTLSFlags(t *testing.T) { kerb := auth.KerberosConfig{ServiceName: "kafka", ConfigPath: "/etc/krb5.conf", KeyTabPath: "/etc/security/kafka.keytab"} - plain := auth.PlainTextConfig{UserName: "", Password: "", Mechanism: "PLAIN"} + plain := auth.PlainTextConfig{Username: "", Password: "", Mechanism: "PLAIN"} tests := []struct { flags []string expected auth.AuthenticationConfig diff --git a/cmd/opentelemetry/app/exporter/kafkaexporter/kafka_exporter.go b/cmd/opentelemetry/app/exporter/kafkaexporter/kafka_exporter.go index 4cf4c583b66..49e84f20941 100644 --- a/cmd/opentelemetry/app/exporter/kafkaexporter/kafka_exporter.go +++ b/cmd/opentelemetry/app/exporter/kafkaexporter/kafka_exporter.go @@ -77,7 +77,7 @@ func (f Factory) CreateDefaultConfig() configmodels.Exporter { if opts.Config.Authentication == "plaintext" { cfg.Authentication.PlainText = &kafkaexporter.PlainTextConfig{ - Username: opts.Config.PlainText.UserName, + Username: opts.Config.PlainText.Username, Password: opts.Config.PlainText.Password, } } diff --git a/cmd/opentelemetry/app/receiver/kafkareceiver/kafka_receiver.go b/cmd/opentelemetry/app/receiver/kafkareceiver/kafka_receiver.go index 912dc906de9..514b5e99a78 100644 --- a/cmd/opentelemetry/app/receiver/kafkareceiver/kafka_receiver.go +++ b/cmd/opentelemetry/app/receiver/kafkareceiver/kafka_receiver.go @@ -84,7 +84,7 @@ func (f *Factory) CreateDefaultConfig() configmodels.Receiver { if opts.Authentication == "plaintext" { cfg.Authentication.PlainText = &kafkaexporter.PlainTextConfig{ - Username: opts.PlainText.UserName, + Username: opts.PlainText.Username, Password: opts.PlainText.Password, } } diff --git a/pkg/kafka/auth/config.go b/pkg/kafka/auth/config.go index 34f629e3904..a133b1a29de 100644 --- a/pkg/kafka/auth/config.go +++ b/pkg/kafka/auth/config.go @@ -84,7 +84,7 @@ func (config *AuthenticationConfig) InitFromViper(configPrefix string, v *viper. config.Kerberos.ServiceName = v.GetString(configPrefix + kerberosPrefix + suffixKerberosServiceName) config.Kerberos.Realm = v.GetString(configPrefix + kerberosPrefix + suffixKerberosRealm) config.Kerberos.UseKeyTab = v.GetBool(configPrefix + kerberosPrefix + suffixKerberosUseKeyTab) - config.Kerberos.Username = v.GetString(configPrefix + kerberosPrefix + suffixKerberosUserName) + config.Kerberos.Username = v.GetString(configPrefix + kerberosPrefix + suffixKerberosUsername) config.Kerberos.Password = v.GetString(configPrefix + kerberosPrefix + suffixKerberosPassword) config.Kerberos.ConfigPath = v.GetString(configPrefix + kerberosPrefix + suffixKerberosConfig) config.Kerberos.KeyTabPath = v.GetString(configPrefix + kerberosPrefix + suffixKerberosKeyTab) @@ -100,7 +100,7 @@ func (config *AuthenticationConfig) InitFromViper(configPrefix string, v *viper. config.TLS.Enabled = true } - config.PlainText.UserName = v.GetString(configPrefix + plainTextPrefix + suffixPlainTextUserName) + config.PlainText.Username = v.GetString(configPrefix + plainTextPrefix + suffixPlainTextUsername) config.PlainText.Password = v.GetString(configPrefix + plainTextPrefix + suffixPlainTextPassword) config.PlainText.Mechanism = v.GetString(configPrefix + plainTextPrefix + suffixPlainTextMechanism) } diff --git a/pkg/kafka/auth/options.go b/pkg/kafka/auth/options.go index d38efe83aa0..44e860ced72 100644 --- a/pkg/kafka/auth/options.go +++ b/pkg/kafka/auth/options.go @@ -30,7 +30,7 @@ const ( suffixKerberosServiceName = ".service-name" suffixKerberosRealm = ".realm" suffixKerberosUseKeyTab = ".use-keytab" - suffixKerberosUserName = ".username" + suffixKerberosUsername = ".username" suffixKerberosPassword = ".password" suffixKerberosConfig = ".config-file" suffixKerberosKeyTab = ".keytab-file" @@ -44,11 +44,11 @@ const ( defaultKerberosKeyTab = "/etc/security/kafka.keytab" plainTextPrefix = ".plaintext" - suffixPlainTextUserName = ".username" + suffixPlainTextUsername = ".username" suffixPlainTextPassword = ".password" suffixPlainTextMechanism = ".mechanism" - defaultPlainTextUserName = "" + defaultPlainTextUsername = "" defaultPlainTextPassword = "" defaultPlainTextMechanism = "PLAIN" ) @@ -67,7 +67,7 @@ func addKerberosFlags(configPrefix string, flagSet *flag.FlagSet) { defaultKerberosPassword, "The Kerberos password used for authenticate with KDC") flagSet.String( - configPrefix+kerberosPrefix+suffixKerberosUserName, + configPrefix+kerberosPrefix+suffixKerberosUsername, defaultKerberosUsername, "The Kerberos username used for authenticate with KDC") flagSet.String( @@ -86,8 +86,8 @@ func addKerberosFlags(configPrefix string, flagSet *flag.FlagSet) { func addPlainTextFlags(configPrefix string, flagSet *flag.FlagSet) { flagSet.String( - configPrefix+plainTextPrefix+suffixPlainTextUserName, - defaultPlainTextUserName, + configPrefix+plainTextPrefix+suffixPlainTextUsername, + defaultPlainTextUsername, "The plaintext Username for SASL/PLAIN authentication") flagSet.String( configPrefix+plainTextPrefix+suffixPlainTextPassword, diff --git a/pkg/kafka/auth/plaintext.go b/pkg/kafka/auth/plaintext.go index a9542cb3d0b..1bb95334ec7 100644 --- a/pkg/kafka/auth/plaintext.go +++ b/pkg/kafka/auth/plaintext.go @@ -58,7 +58,7 @@ func (x *scramClient) Done() bool { // PlainTextConfig describes the configuration properties needed for SASL/PLAIN with kafka type PlainTextConfig struct { - UserName string `mapstructure:"username"` + Username string `mapstructure:"username"` Password string `mapstructure:"password" json:"-"` Mechanism string `mapstructure:"mechanism"` } @@ -67,7 +67,7 @@ var _ sarama.SCRAMClient = (*scramClient)(nil) func setPlainTextConfiguration(config *PlainTextConfig, saramaConfig *sarama.Config) error { saramaConfig.Net.SASL.Enable = true - saramaConfig.Net.SASL.User = config.UserName + saramaConfig.Net.SASL.User = config.Username saramaConfig.Net.SASL.Password = config.Password switch strings.ToUpper(config.Mechanism) { case "SCRAM-SHA-256": @@ -88,4 +88,4 @@ func setPlainTextConfiguration(config *PlainTextConfig, saramaConfig *sarama.Con } return nil -} \ No newline at end of file +} diff --git a/plugin/storage/kafka/options_test.go b/plugin/storage/kafka/options_test.go index b01ff66ce40..f90b6040650 100644 --- a/plugin/storage/kafka/options_test.go +++ b/plugin/storage/kafka/options_test.go @@ -173,7 +173,7 @@ func TestRequiredAcksFailures(t *testing.T) { func TestTLSFlags(t *testing.T) { kerb := auth.KerberosConfig{ServiceName: "kafka", ConfigPath: "/etc/krb5.conf", KeyTabPath: "/etc/security/kafka.keytab"} - plain := auth.PlainTextConfig{UserName: "", Password: "", Mechanism: "PLAIN"} + plain := auth.PlainTextConfig{Username: "", Password: "", Mechanism: "PLAIN"} tests := []struct { flags []string expected auth.AuthenticationConfig