Skip to content

Commit

Permalink
kafka(ticdc): fix panic issue when sasl enabled by config file (#9668)
Browse files Browse the repository at this point in the history
close #9669
  • Loading branch information
sdojjy authored Sep 7, 2023
1 parent 637d6c2 commit 2ab025e
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 0 deletions.
17 changes: 17 additions & 0 deletions cdc/sink/codec/common/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,3 +313,20 @@ func TestOpenProtocolHandleKeyOnly(t *testing.T) {
err = codecConfig.Validate()
require.NoError(t, err)
}

func TestKafkaConfigWithoutHandleKey(t *testing.T) {
t.Parallel()

// handle-key-only not enabled, always no error
replicaConfig := config.GetDefaultReplicaConfig()
replicaConfig.Sink.KafkaConfig = &config.KafkaConfig{}

uri := "kafka://127.0.0.1:9092/canal-json?protocol=canal-json"
sinkURI, err := url.Parse(uri)
require.NoError(t, err)

codecConfig := NewConfig(config.ProtocolCanalJSON)
err = codecConfig.Apply(sinkURI, replicaConfig)
require.NoError(t, err)
require.False(t, codecConfig.LargeMessageHandle.HandleKeyOnly())
}
6 changes: 6 additions & 0 deletions pkg/config/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,10 +500,16 @@ func (c *LargeMessageHandleConfig) Validate(protocol Protocol, enableTiDBExtensi

// HandleKeyOnly returns true if handle large message by encoding handle key only.
func (c *LargeMessageHandleConfig) HandleKeyOnly() bool {
if c == nil {
return false
}
return c.LargeMessageHandleOption == LargeMessageHandleOptionHandleKeyOnly
}

// Disabled returns true if disable large message handle.
func (c *LargeMessageHandleConfig) Disabled() bool {
if c == nil {
return true
}
return c.LargeMessageHandleOption == LargeMessageHandleOptionNone
}

0 comments on commit 2ab025e

Please sign in to comment.