Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink/kafka(ticdc): support PLAIN and GSSAPI auth #5122

Merged
merged 6 commits into from
Apr 11, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 129 additions & 45 deletions cdc/sink/producer/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type Config struct {
Compression string
ClientID string
Credential *security.Credential
SaslScram *security.SaslScram
SASL *security.SASL
// control whether to create topic
AutoCreate bool

Expand All @@ -62,7 +62,7 @@ func NewConfig() *Config {
ReplicationFactor: 1,
Compression: "none",
Credential: &security.Credential{},
SaslScram: &security.SaslScram{},
SASL: &security.SASL{},
AutoCreate: true,
DialTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
Expand Down Expand Up @@ -97,22 +97,6 @@ func (c *Config) setPartitionNum(realPartitionCount int32) error {
return nil
}

// AutoCreateTopicConfig is used to create topic configuration.
type AutoCreateTopicConfig struct {
AutoCreate bool
PartitionNum int32
ReplicationFactor int16
}

// DeriveTopicConfig derive a `topicConfig` from the `Config`
func (c *Config) DeriveTopicConfig() *AutoCreateTopicConfig {
return &AutoCreateTopicConfig{
AutoCreate: c.AutoCreate,
PartitionNum: c.PartitionNum,
ReplicationFactor: c.ReplicationFactor,
}
}

// Apply the sinkURI to update Config
func (c *Config) Apply(sinkURI *url.URL) error {
c.BrokerEndpoints = strings.Split(sinkURI.Host, ",")
Expand Down Expand Up @@ -174,21 +158,6 @@ func (c *Config) Apply(sinkURI *url.URL) error {
c.Credential.KeyPath = s
}

s = params.Get("sasl-user")
if s != "" {
c.SaslScram.SaslUser = s
}

s = params.Get("sasl-password")
if s != "" {
c.SaslScram.SaslPassword = s
}

s = params.Get("sasl-mechanism")
if s != "" {
c.SaslScram.SaslMechanism = s
}

s = params.Get("auto-create-topic")
if s != "" {
autoCreate, err := strconv.ParseBool(s)
Expand Down Expand Up @@ -225,9 +194,101 @@ func (c *Config) Apply(sinkURI *url.URL) error {
c.ReadTimeout = a
}

err := c.applySASL(params)
if err != nil {
return err
}

return nil
}

func (c *Config) applySASL(params url.Values) error {
s := params.Get("sasl-user")
if s != "" {
c.SASL.SASLUser = s
}

s = params.Get("sasl-password")
if s != "" {
c.SASL.SASLPassword = s
}

s = params.Get("sasl-mechanism")
if s != "" {
mechanism, err := security.SASLMechanismFormString(s)
if err != nil {
return cerror.WrapError(cerror.ErrKafkaInvalidConfig, err)
}
c.SASL.SASLMechanism = mechanism
}

s = params.Get("sasl-gssapi-auth-type")
if s != "" {
authType, err := security.AuthTypeFromString(s)
if err != nil {
return cerror.WrapError(cerror.ErrKafkaInvalidConfig, err)
}
c.SASL.GSSAPI.AuthType = authType
}

s = params.Get("sasl-gssapi-keytab-path")
if s != "" {
c.SASL.GSSAPI.KeyTabPath = s
}

s = params.Get("sasl-gssapi-kerberos-config-path")
if s != "" {
c.SASL.GSSAPI.KerberosConfigPath = s
}

s = params.Get("sasl-gssapi-service-name")
if s != "" {
c.SASL.GSSAPI.ServiceName = s
}

s = params.Get("sasl-gssapi-user")
if s != "" {
c.SASL.GSSAPI.Username = s
}

s = params.Get("sasl-gssapi-password")
if s != "" {
c.SASL.GSSAPI.Password = s
}

s = params.Get("sasl-gssapi-realm")
if s != "" {
c.SASL.GSSAPI.Realm = s
}

s = params.Get("sasl-gssapi-disable-pafxfast")
if s != "" {
disablePAFXFAST, err := strconv.ParseBool(s)
if err != nil {
return err
}
c.SASL.GSSAPI.DisablePAFXFAST = disablePAFXFAST
}

return nil
}

// AutoCreateTopicConfig is used to create topic configuration.
type AutoCreateTopicConfig struct {
AutoCreate bool
PartitionNum int32
ReplicationFactor int16
}

// DeriveTopicConfig derive a `topicConfig` from the `Config`
func (c *Config) DeriveTopicConfig() *AutoCreateTopicConfig {
return &AutoCreateTopicConfig{
AutoCreate: c.AutoCreate,
PartitionNum: c.PartitionNum,
ReplicationFactor: c.ReplicationFactor,
}
}

// NewSaramaConfig return the default config and set the according version and metrics
func NewSaramaConfig(ctx context.Context, c *Config) (*sarama.Config, error) {
config := sarama.NewConfig()
Expand Down Expand Up @@ -312,19 +373,42 @@ func NewSaramaConfig(ctx context.Context, c *Config) (*sarama.Config, error) {
return nil, errors.Trace(err)
}
}
if c.SaslScram != nil && len(c.SaslScram.SaslUser) != 0 {

completeSaramaSASLConfig(config, c)

return config, err
}

func completeSaramaSASLConfig(config *sarama.Config, c *Config) {
if c.SASL != nil && c.SASL.SASLMechanism != "" {
config.Net.SASL.Enable = true
config.Net.SASL.User = c.SaslScram.SaslUser
config.Net.SASL.Password = c.SaslScram.SaslPassword
config.Net.SASL.Mechanism = sarama.SASLMechanism(c.SaslScram.SaslMechanism)
if strings.EqualFold(c.SaslScram.SaslMechanism, "SCRAM-SHA-256") {
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &security.XDGSCRAMClient{HashGeneratorFcn: security.SHA256} }
} else if strings.EqualFold(c.SaslScram.SaslMechanism, "SCRAM-SHA-512") {
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &security.XDGSCRAMClient{HashGeneratorFcn: security.SHA512} }
} else {
return nil, errors.New("Unsupported sasl-mechanism, should be SCRAM-SHA-256 or SCRAM-SHA-512")
config.Net.SASL.Mechanism = sarama.SASLMechanism(c.SASL.SASLMechanism)
switch c.SASL.SASLMechanism {
case sarama.SASLTypeSCRAMSHA256, sarama.SASLTypeSCRAMSHA512, sarama.SASLTypePlaintext:
config.Net.SASL.User = c.SASL.SASLUser
config.Net.SASL.Password = c.SASL.SASLPassword
if strings.EqualFold(string(c.SASL.SASLMechanism), sarama.SASLTypeSCRAMSHA256) {
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &security.XDGSCRAMClient{HashGeneratorFcn: security.SHA256}
}
} else if strings.EqualFold(string(c.SASL.SASLMechanism), sarama.SASLTypeSCRAMSHA512) {
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &security.XDGSCRAMClient{HashGeneratorFcn: security.SHA512}
}
}
case sarama.SASLTypeGSSAPI:
config.Net.SASL.GSSAPI.AuthType = int(c.SASL.GSSAPI.AuthType)
config.Net.SASL.GSSAPI.Username = c.SASL.GSSAPI.Username
config.Net.SASL.GSSAPI.ServiceName = c.SASL.GSSAPI.ServiceName
config.Net.SASL.GSSAPI.KerberosConfigPath = c.SASL.GSSAPI.KerberosConfigPath
config.Net.SASL.GSSAPI.Realm = c.SASL.GSSAPI.Realm
config.Net.SASL.GSSAPI.DisablePAFXFAST = c.SASL.GSSAPI.DisablePAFXFAST
switch c.SASL.GSSAPI.AuthType {
case security.UserAuth:
config.Net.SASL.GSSAPI.Password = c.SASL.GSSAPI.Password
case security.KeyTabAuth:
config.Net.SASL.GSSAPI.KeyTabPath = c.SASL.GSSAPI.KeyTabPath
}
}
}

return config, err
}
115 changes: 111 additions & 4 deletions cdc/sink/producer/kafka/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,10 @@ func TestNewSaramaConfig(t *testing.T) {
saslConfig := NewConfig()
saslConfig.Version = "2.6.0"
saslConfig.ClientID = "test-sasl-scram"
saslConfig.SaslScram = &security.SaslScram{
SaslUser: "user",
SaslPassword: "password",
SaslMechanism: sarama.SASLTypeSCRAMSHA256,
saslConfig.SASL = &security.SASL{
SASLUser: "user",
SASLPassword: "password",
SASLMechanism: sarama.SASLTypeSCRAMSHA256,
}

cfg, err := NewSaramaConfig(ctx, saslConfig)
Expand Down Expand Up @@ -421,3 +421,110 @@ func TestConfigurationCombinations(t *testing.T) {
_ = adminClient.Close()
}
}

func TestApplySASL(t *testing.T) {
t.Parallel()

tests := []struct {
name string
URI string
exceptErr string
}{
{
name: "no params",
URI: "kafka://127.0.0.1:9092/abc",
exceptErr: "",
},
{
name: "valid PLAIN SASL",
URI: "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0&partition-num=0" +
"&sasl-username=user&sasl-password=password&sasl-mechanism=plain",
exceptErr: "",
},
{
name: "valid SCRAM SASL",
URI: "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0&partition-num=0" +
"&sasl-username=user&sasl-password=password&sasl-mechanism=SCRAM-SHA-512",
exceptErr: "",
},
{
name: "valid GSSAPI user auth SASL",
URI: "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0&partition-num=0" +
"&sasl-mechanism=GSSAPI&sasl-gssapi-auth-type=USER" +
"&sasl-gssapi-kerberos-config-path=/root/config" +
"&sasl-gssapi-service-name=a&sasl-gssapi-user=user" +
"&sasl-gssapi-password=pwd" +
"&sasl-gssapi-realm=realm&sasl-gssapi-disable-pafxfast=false",
exceptErr: "",
},
{
name: "valid GSSAPI keytab auth SASL",
URI: "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0&partition-num=0" +
"&sasl-mechanism=GSSAPI&sasl-gssapi-auth-type=keytab" +
"&sasl-gssapi-kerberos-config-path=/root/config" +
"&sasl-gssapi-service-name=a&sasl-gssapi-user=user" +
"&sasl-gssapi-keytab-path=/root/keytab" +
"&sasl-gssapi-realm=realm&sasl-gssapi-disable-pafxfast=false",
exceptErr: "",
},
{
name: "invalid mechanism",
URI: "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0&partition-num=0" +
"&sasl-mechanism=a",
exceptErr: "unknown a SASL mechanism",
},
{
name: "invalid GSSAPI auth type",
URI: "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0&partition-num=0" +
"&sasl-mechanism=gssapi&sasl-gssapi-auth-type=keyta1b",
exceptErr: "unknown keyta1b auth type",
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
t.Parallel()
cfg := NewConfig()
sinkURI, err := url.Parse(test.URI)
require.Nil(t, err)
if test.exceptErr == "" {
require.Nil(t, cfg.applySASL(sinkURI.Query()))
} else {
require.Regexp(t, test.exceptErr, cfg.applySASL(sinkURI.Query()).Error())
}
})
}
}

func TestCompleteSaramaSASLConfig(t *testing.T) {
t.Parallel()

// Test that SASL is turned on correctly.
cfg := NewConfig()
cfg.SASL = &security.SASL{
SASLUser: "user",
SASLPassword: "password",
SASLMechanism: "",
GSSAPI: security.GSSAPI{},
}
saramaConfig := sarama.NewConfig()
completeSaramaSASLConfig(saramaConfig, cfg)
require.False(t, saramaConfig.Net.SASL.Enable)
cfg.SASL.SASLMechanism = "plain"
completeSaramaSASLConfig(saramaConfig, cfg)
require.True(t, saramaConfig.Net.SASL.Enable)
// Test that the SCRAMClientGeneratorFunc is set up correctly.
cfg = NewConfig()
cfg.SASL = &security.SASL{
SASLUser: "user",
SASLPassword: "password",
SASLMechanism: "plain",
GSSAPI: security.GSSAPI{},
}
saramaConfig = sarama.NewConfig()
completeSaramaSASLConfig(saramaConfig, cfg)
require.Nil(t, saramaConfig.Net.SASL.SCRAMClientGeneratorFunc)
cfg.SASL.SASLMechanism = "SCRAM-SHA-512"
completeSaramaSASLConfig(saramaConfig, cfg)
require.NotNil(t, saramaConfig.Net.SASL.SCRAMClientGeneratorFunc)
}
Loading