diff --git a/cdc/sink/producer/kafka/config_test.go b/cdc/sink/producer/kafka/config_test.go index b52f619ef4c..4b9f6b59dc1 100644 --- a/cdc/sink/producer/kafka/config_test.go +++ b/cdc/sink/producer/kafka/config_test.go @@ -421,3 +421,70 @@ func TestConfigurationCombinations(t *testing.T) { _ = adminClient.Close() } } + +func TestApplySASL(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + URI string + exceptErr string + }{ + { + name: "no params", + URI: "kafka://127.0.0.1:9092/abc", + exceptErr: "", + }, + { + name: "valid PLAIN SASL", + URI: "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0&partition-num=0" + + "&sasl-username=user&sasl-password=password&sasl-mechanism=plain", + exceptErr: "", + }, + { + name: "valid SCRAM SASL", + URI: "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0&partition-num=0" + + "&sasl-username=user&sasl-password=password&sasl-mechanism=SCRAM-SHA-512", + exceptErr: "", + }, + { + name: "valid GSSAPI user auth SASL", + URI: "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0&partition-num=0" + + "&sasl-mechanism=GSSAPI&sasl-gssapi-auth-type=USER&sasl-gssapi-kerberos-config-path=/root/config" + + "&sasl-gssapi-service-name=a&sasl-gssapi-user=user&sasl-gssapi-password=pwd" + + "&sasl-gssapi-realm=realm&sasl-gssapi-disable-pafxfast=false", + exceptErr: "", + }, + { + name: "valid GSSAPI keytab auth SASL", + URI: "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0&partition-num=0" + + "&sasl-mechanism=GSSAPI&sasl-gssapi-auth-type=keytab&sasl-gssapi-kerberos-config-path=/root/config" + + "&sasl-gssapi-service-name=a&sasl-gssapi-user=user&sasl-gssapi-keytab-path=/root/keytab" + + "&sasl-gssapi-realm=realm&sasl-gssapi-disable-pafxfast=false", + exceptErr: "", + }, + { + name: "invalid mechanism", + URI: "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0&partition-num=0" + + "&sasl-mechanism=a", + exceptErr: "unknown a SASL mechanism", + }, + { + name: "invalid GSSAPI auth type", + URI: "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0&partition-num=0" + + "&sasl-mechanism=gssapi&sasl-gssapi-auth-type=keyta1b", + exceptErr: "unknown keyta1b auth type", + }, + } + + for _, test := range tests { + cfg := NewConfig() + sinkURI, err := url.Parse(test.URI) + require.Nil(t, err) + if test.exceptErr == "" { + require.Nil(t, cfg.applySASL(sinkURI.Query())) + } else { + require.Regexp(t, test.exceptErr, cfg.applySASL(sinkURI.Query()).Error()) + } + } +} diff --git a/pkg/security/sasl_scram.go b/pkg/security/sasl.go similarity index 95% rename from pkg/security/sasl_scram.go rename to pkg/security/sasl.go index e064f98d6f2..23e4e16bfe7 100644 --- a/pkg/security/sasl_scram.go +++ b/pkg/security/sasl.go @@ -14,7 +14,6 @@ package security import ( - "fmt" "strings" "github.com/Shopify/sarama" @@ -50,7 +49,7 @@ func SASLMechanismFormString(s string) (SASLMechanism, error) { case "gssapi": return GSSAPIMechanism, nil default: - return UnknownMechanism, errors.Errorf("unknown SASL mechanism %s", s) + return UnknownMechanism, errors.Errorf("unknown %s SASL mechanism", s) } } @@ -82,7 +81,7 @@ func AuthTypeFromString(s string) (GSSAPIAuthType, error) { case "keytab": return KeyTabAuth, nil default: - return UnknownAuth, errors.New(fmt.Sprintf("unknown %s auth type", s)) + return UnknownAuth, errors.Errorf("unknown %s auth type", s) } }