Skip to content

Commit

Permalink
sink(ticdc): pass client and admin client
Browse files Browse the repository at this point in the history
  • Loading branch information
Rustin170506 committed Mar 9, 2022
1 parent 301ff7d commit 8c51e5b
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 17 deletions.
2 changes: 1 addition & 1 deletion cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ func newKafkaSaramaSink(ctx context.Context, sinkURI *url.URL,
return nil, cerror.WrapError(cerror.ErrKafkaCreateTopic, err)
}

sProducer, err := kafka.NewKafkaSaramaProducer(ctx, topic, baseConfig, saramaConfig, errCh)
sProducer, err := kafka.NewKafkaSaramaProducer(ctx, client, adminClient, topic, baseConfig, saramaConfig, errCh)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
24 changes: 11 additions & 13 deletions cdc/sink/producer/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ type kafkaSaramaProducer struct {
// clientLock is used to protect concurrent access of asyncProducer and syncProducer.
// Since we don't close these two clients (which have an input chan) from the
// sender routine, data race or send on closed chan could happen.
clientLock sync.RWMutex
clientLock sync.RWMutex
// This admin mainly used by `metricsMonitor` to fetch broker info.
admin kafka.ClusterAdminClient
client sarama.Client
asyncProducer sarama.AsyncProducer
Expand Down Expand Up @@ -317,23 +318,20 @@ func (k *kafkaSaramaProducer) run(ctx context.Context) error {
var NewAdminClientImpl kafka.ClusterAdminClientCreator = kafka.NewSaramaAdminClient

// NewKafkaSaramaProducer creates a kafka sarama producer
func NewKafkaSaramaProducer(ctx context.Context, topic string, config *Config, saramaConfig *sarama.Config, errCh chan error) (*kafkaSaramaProducer, error) {
func NewKafkaSaramaProducer(
ctx context.Context,
client sarama.Client,
admin kafka.ClusterAdminClient,
topic string,
config *Config,
saramaConfig *sarama.Config,
errCh chan error,
) (*kafkaSaramaProducer, error) {
changefeedID := util.ChangefeedIDFromCtx(ctx)
role := util.RoleFromCtx(ctx)
log.Info("Starting kafka sarama producer ...", zap.Any("config", config),
zap.String("changefeed", changefeedID), zap.Any("role", role))

// this admin mainly used by `metricsMonitor` to fetch broker info.
admin, err := NewAdminClientImpl(config.BrokerEndpoints, saramaConfig)
if err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}

client, err := sarama.NewClient(config.BrokerEndpoints, saramaConfig)
if err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}

asyncProducer, err := sarama.NewAsyncProducerFromClient(client)
if err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
Expand Down
18 changes: 15 additions & 3 deletions cdc/sink/producer/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,11 @@ func (s *kafkaSuite) TestNewSaramaProducer(c *check.C) {
saramaConfig, err := NewSaramaConfig(ctx, config)
c.Assert(err, check.IsNil)
saramaConfig.Producer.Flush.MaxMessages = 1
producer, err := NewKafkaSaramaProducer(ctx, topic, config, saramaConfig, errCh)
client, err := sarama.NewClient(config.BrokerEndpoints, saramaConfig)
c.Assert(err, check.IsNil)
adminClient, err := NewAdminClientImpl(config.BrokerEndpoints, saramaConfig)
c.Assert(err, check.IsNil)
producer, err := NewKafkaSaramaProducer(ctx, client, adminClient, topic, config, saramaConfig, errCh)
c.Assert(err, check.IsNil)
c.Assert(producer.GetPartitionNum(), check.Equals, int32(2))

Expand Down Expand Up @@ -369,7 +373,11 @@ func (s *kafkaSuite) TestProducerSendMessageFailed(c *check.C) {
saramaConfig.Producer.Retry.Max = 2
saramaConfig.Producer.MaxMessageBytes = 8

producer, err := NewKafkaSaramaProducer(ctx, topic, config, saramaConfig, errCh)
client, err := sarama.NewClient(config.BrokerEndpoints, saramaConfig)
c.Assert(err, check.IsNil)
adminClient, err := NewAdminClientImpl(config.BrokerEndpoints, saramaConfig)
c.Assert(err, check.IsNil)
producer, err := NewKafkaSaramaProducer(ctx, client, adminClient, topic, config, saramaConfig, errCh)
defer func() {
err := producer.Close()
c.Assert(err, check.IsNil)
Expand Down Expand Up @@ -439,7 +447,11 @@ func (s *kafkaSuite) TestProducerDoubleClose(c *check.C) {
ctx = util.PutRoleInCtx(ctx, util.RoleTester)
saramaConfig, err := NewSaramaConfig(context.Background(), config)
c.Assert(err, check.IsNil)
producer, err := NewKafkaSaramaProducer(ctx, topic, config, saramaConfig, errCh)
client, err := sarama.NewClient(config.BrokerEndpoints, saramaConfig)
c.Assert(err, check.IsNil)
adminClient, err := NewAdminClientImpl(config.BrokerEndpoints, saramaConfig)
c.Assert(err, check.IsNil)
producer, err := NewKafkaSaramaProducer(ctx, client, adminClient, topic, config, saramaConfig, errCh)
defer func() {
err := producer.Close()
c.Assert(err, check.IsNil)
Expand Down

0 comments on commit 8c51e5b

Please sign in to comment.