Skip to content

Commit

Permalink
sink(cdc): adjust error in kafka producer adjust configuration logic. (
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand authored May 5, 2022
1 parent 3a24817 commit a2403ce
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 14 deletions.
2 changes: 1 addition & 1 deletion cdc/sink/mq/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ func NewKafkaSaramaSink(ctx context.Context, sinkURI *url.URL,
}

if err := kafka.AdjustConfig(adminClient, baseConfig, saramaConfig, topic); err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err)
return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}

var protocol config.Protocol
Expand Down
2 changes: 2 additions & 0 deletions cdc/sink/mq/producer/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ func (c *Config) setPartitionNum(realPartitionCount int32) error {
// user does not specify the `partition-num` in the sink-uri
if c.PartitionNum == 0 {
c.PartitionNum = realPartitionCount
log.Info("partitionNum is not set, set by topic's partition-num",
zap.Int32("partitionNum", realPartitionCount))
return nil
}

Expand Down
22 changes: 12 additions & 10 deletions cdc/sink/mq/producer/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,14 +384,12 @@ func AdjustConfig(
) error {
topics, err := admin.ListTopics()
if err != nil {
return cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
return errors.Trace(err)
}

err = validateMinInsyncReplicas(admin, topics, topic, int(config.ReplicationFactor))
if err != nil {
return cerror.ErrKafkaInvalidConfig.Wrap(err).GenWithStack(
"because TiCDC Kafka producer's `request.required.acks` defaults to -1, " +
"TiCDC cannot deliver messages when the `replication-factor` is less than `min.insync.replicas`")
return errors.Trace(err)
}

info, exists := topics[topic]
Expand All @@ -401,11 +399,11 @@ func AdjustConfig(
topicMaxMessageBytesStr, err := getTopicConfig(admin, info, kafka.TopicMaxMessageBytesConfigName,
kafka.BrokerMessageMaxBytesConfigName)
if err != nil {
return cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
return errors.Trace(err)
}
topicMaxMessageBytes, err := strconv.Atoi(topicMaxMessageBytesStr)
if err != nil {
return cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
return errors.Trace(err)
}

if topicMaxMessageBytes < config.MaxMessageBytes {
Expand Down Expand Up @@ -436,7 +434,7 @@ func AdjustConfig(
}
brokerMessageMaxBytes, err := strconv.Atoi(brokerMessageMaxBytesStr)
if err != nil {
return cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
return errors.Trace(err)
}

// when create the topic, `max.message.bytes` is decided by the broker,
Expand Down Expand Up @@ -501,9 +499,13 @@ func validateMinInsyncReplicas(
if replicationFactor < minInsyncReplicas {
msg := fmt.Sprintf("`replication-factor` cannot be smaller than the `%s` of %s",
kafka.MinInsyncReplicasConfigName, configFrom)
log.Error(msg, zap.Int("replicationFactor", replicationFactor),
zap.Int("minInsyncReplicas", minInsyncReplicas))
return errors.New(msg)
log.Error(msg, zap.Int("replication-factor", replicationFactor),
zap.Int("min.insync.replicas", minInsyncReplicas))
return cerror.ErrKafkaInvalidConfig.GenWithStack(
"TiCDC Kafka producer's `request.required.acks` defaults to -1, "+
"TiCDC cannot deliver messages when the `replication-factor` "+
"is smaller than the `min.insync.replicas` of %s", configFrom,
)
}

return nil
Expand Down
20 changes: 18 additions & 2 deletions cdc/sink/mq/producer/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,18 +306,34 @@ func TestAdjustConfigMinInsyncReplicas(t *testing.T) {
err = AdjustConfig(adminClient, config, saramaConfig, "create-new-fail-invalid-min-insync-replicas")
require.Regexp(
t,
".*`replication-factor` cannot be smaller than the `min.insync.replicas` of broker.*",
".*`replication-factor` is smaller than the `min.insync.replicas` of broker.*",
errors.Cause(err),
)

// topic not exist, and `min.insync.replicas` not found in broker's configuration
adminClient.DropBrokerConfig()
err = AdjustConfig(adminClient, config, saramaConfig, "no-topic-no-min-insync-replicas")
require.Regexp(t, ".*cannot find the `min.insync.replicas` from the broker's configuration",
errors.Cause(err))

// Report an error if the replication-factor is less than min.insync.replicas
// when the topic does exist.
saramaConfig, err = NewSaramaConfig(context.Background(), config)
require.Nil(t, err)

// topic exist, but `min.insync.replicas` not found in topic and broker configuration
topicName := "topic-no-config-entry"
err = adminClient.CreateTopic(topicName, &sarama.TopicDetail{}, false)
require.Nil(t, err)
err = AdjustConfig(adminClient, config, saramaConfig, topicName)
require.Regexp(t, ".*cannot find the `min.insync.replicas` from the broker's configuration",
errors.Cause(err))

// topic found, and have `min.insync.replicas`, but set to 2, larger than `replication-factor`.
adminClient.SetMinInsyncReplicas("2")
err = AdjustConfig(adminClient, config, saramaConfig, adminClient.GetDefaultMockTopicName())
require.Regexp(t,
".*`replication-factor` cannot be smaller than the `min.insync.replicas` of topic.*",
".*`replication-factor` is smaller than the `min.insync.replicas` of topic.*",
errors.Cause(err),
)
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/kafka/cluster_admin_client_mock_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,3 +144,8 @@ func (c *ClusterAdminClientMockImpl) GetTopicMaxMessageBytes() int {
maxMessageBytes, _ := strconv.Atoi(TopicMaxMessageBytes)
return maxMessageBytes
}

// DropBrokerConfig remove all broker level configuration for test purpose.
func (c *ClusterAdminClientMockImpl) DropBrokerConfig() {
c.brokerConfigs = c.brokerConfigs[:0]
}
2 changes: 1 addition & 1 deletion scripts/check-log-style.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
grep -RnE "zap.[A-Z][a-zA-Z0-9]+\(\"[0-9A-Za-z]*[-_ ][^\"]*\"(,|\))" cdc tests pkg |
grep -vE "user-agent" |
grep -vE "https_proxy|http_proxy|no_proxy" |
grep -vE "max-message-bytes|max-message-size" |
grep -vE "max-message-bytes|max-message-size|replication-factor" |
grep -vE "release-version|git-hash|git-branch|go-version" |
grep -vE "failpoint-build|utc-build-time" |
awk '{ print } END { if (NR > 0) { exit 1 } }'

0 comments on commit a2403ce

Please sign in to comment.