diff --git a/cdc/sink/dispatcher/event_router.go b/cdc/sink/dispatcher/event_router.go index b7491a2a32d..767d166de35 100644 --- a/cdc/sink/dispatcher/event_router.go +++ b/cdc/sink/dispatcher/event_router.go @@ -107,13 +107,23 @@ func NewEventRouter(cfg *config.ReplicaConfig, defaultTopic string) (*EventRoute }, nil } -// DispatchRowChangedEvent returns the target topic and partition. -func (s *EventRouter) DispatchRowChangedEvent(row *model.RowChangedEvent, partitionNum int32) (string, int32) { - topicDispatcher, partitionDispatcher := s.matchDispatcher(row.Table.Schema, row.Table.Table) - topic := topicDispatcher.DispatchRowChangedEvent(row) - partition := partitionDispatcher.DispatchRowChangedEvent(row, partitionNum) +// GetTopic returns the target topic. +func (s *EventRouter) GetTopic(row *model.RowChangedEvent) string { + topicDispatcher, _ := s.matchDispatcher(row.Table.Schema, row.Table.Table) + return topicDispatcher.DispatchRowChangedEvent(row) +} - return topic, partition +// GetPartition returns the target partition. +func (s *EventRouter) GetPartition( + row *model.RowChangedEvent, + partitionNum int32, +) int32 { + _, partitionDispatcher := s.matchDispatcher(row.Table.Schema, + row.Table.Table) + partition := partitionDispatcher.DispatchRowChangedEvent(row, + partitionNum) + + return partition } // GetActiveTopics returns a list of the corresponding topics diff --git a/cdc/sink/dispatcher/event_router_test.go b/cdc/sink/dispatcher/event_router_test.go index be19ff21f9b..7d26ca71d16 100644 --- a/cdc/sink/dispatcher/event_router_test.go +++ b/cdc/sink/dispatcher/event_router_test.go @@ -35,12 +35,34 @@ func TestEventRouter(t *testing.T) { d, err = NewEventRouter(&config.ReplicaConfig{ Sink: &config.SinkConfig{ DispatchRules: []*config.DispatchRule{ - {Matcher: []string{"test_default1.*"}, PartitionRule: "default"}, - {Matcher: []string{"test_default2.*"}, PartitionRule: "unknown-dispatcher"}, - {Matcher: []string{"test_table.*"}, PartitionRule: "table", TopicRule: "hello_{schema}_world"}, - {Matcher: []string{"test_index_value.*"}, PartitionRule: "index-value", TopicRule: "{schema}_world"}, - {Matcher: []string{"test.*"}, PartitionRule: "rowid", TopicRule: "hello_{schema}"}, - {Matcher: []string{"*.*", "!*.test"}, PartitionRule: "ts", TopicRule: "{schema}_{table}"}, + { + Matcher: []string{"test_default1.*"}, + PartitionRule: "default", + }, + { + Matcher: []string{"test_default2.*"}, + PartitionRule: "unknown-dispatcher", + }, + { + Matcher: []string{"test_table.*"}, + PartitionRule: "table", + TopicRule: "hello_{schema}_world", + }, + { + Matcher: []string{"test_index_value.*"}, + PartitionRule: "index-value", + TopicRule: "{schema}_world", + }, + { + Matcher: []string{"test.*"}, + PartitionRule: "rowid", + TopicRule: "hello_{schema}", + }, + { + Matcher: []string{"*.*", "!*.test"}, + PartitionRule: "ts", + TopicRule: "{schema}_{table}", + }, }, }, }, "") @@ -80,12 +102,34 @@ func TestGetActiveTopics(t *testing.T) { d, err := NewEventRouter(&config.ReplicaConfig{ Sink: &config.SinkConfig{ DispatchRules: []*config.DispatchRule{ - {Matcher: []string{"test_default1.*"}, PartitionRule: "default"}, - {Matcher: []string{"test_default2.*"}, PartitionRule: "unknown-dispatcher"}, - {Matcher: []string{"test_table.*"}, PartitionRule: "table", TopicRule: "hello_{schema}_world"}, - {Matcher: []string{"test_index_value.*"}, PartitionRule: "index-value", TopicRule: "{schema}_world"}, - {Matcher: []string{"test.*"}, PartitionRule: "rowid", TopicRule: "hello_{schema}"}, - {Matcher: []string{"*.*", "!*.test"}, PartitionRule: "ts", TopicRule: "{schema}_{table}"}, + { + Matcher: []string{"test_default1.*"}, + PartitionRule: "default", + }, + { + Matcher: []string{"test_default2.*"}, + PartitionRule: "unknown-dispatcher", + }, + { + Matcher: []string{"test_table.*"}, + PartitionRule: "table", + TopicRule: "hello_{schema}_world", + }, + { + Matcher: []string{"test_index_value.*"}, + PartitionRule: "index-value", + TopicRule: "{schema}_world", + }, + { + Matcher: []string{"test.*"}, + PartitionRule: "rowid", + TopicRule: "hello_{schema}", + }, + { + Matcher: []string{"*.*", "!*.test"}, + PartitionRule: "ts", + TopicRule: "{schema}_{table}", + }, }, }, }, "test") @@ -101,3 +145,155 @@ func TestGetActiveTopics(t *testing.T) { topics := d.GetActiveTopics(names) require.Equal(t, []string{"test", "hello_test_table_world", "test_index_value_world", "hello_test", "sbs_table"}, topics) } + +func TestGetTopic(t *testing.T) { + t.Parallel() + + d, err := NewEventRouter(&config.ReplicaConfig{ + Sink: &config.SinkConfig{ + DispatchRules: []*config.DispatchRule{ + { + Matcher: []string{"test_default1.*"}, + PartitionRule: "default", + }, + { + Matcher: []string{"test_default2.*"}, + PartitionRule: "unknown-dispatcher", + }, + { + Matcher: []string{"test_table.*"}, + PartitionRule: "table", + TopicRule: "hello_{schema}_world", + }, + { + Matcher: []string{"test_index_value.*"}, + PartitionRule: "index-value", + TopicRule: "{schema}_world", + }, + { + Matcher: []string{"test.*"}, + PartitionRule: "rowid", + TopicRule: "hello_{schema}", + }, + { + Matcher: []string{"*.*", "!*.test"}, + PartitionRule: "ts", + TopicRule: "{schema}_{table}", + }, + }, + }, + }, "test") + require.Nil(t, err) + + topicName := d.GetTopic(&model.RowChangedEvent{ + Table: &model.TableName{Schema: "test_default1", Table: "table"}, + }) + require.Equal(t, "test", topicName) + topicName = d.GetTopic(&model.RowChangedEvent{ + Table: &model.TableName{Schema: "test_default2", Table: "table"}, + }) + require.Equal(t, "test", topicName) + topicName = d.GetTopic(&model.RowChangedEvent{ + Table: &model.TableName{Schema: "test_table", Table: "table"}, + }) + require.Equal(t, "hello_test_table_world", topicName) + topicName = d.GetTopic(&model.RowChangedEvent{ + Table: &model.TableName{Schema: "test_index_value", Table: "table"}, + }) + require.Equal(t, "test_index_value_world", topicName) + topicName = d.GetTopic(&model.RowChangedEvent{ + Table: &model.TableName{Schema: "a", Table: "table"}, + }) + require.Equal(t, "a_table", topicName) +} + +func TestGetPartition(t *testing.T) { + t.Parallel() + + d, err := NewEventRouter(&config.ReplicaConfig{ + Sink: &config.SinkConfig{ + DispatchRules: []*config.DispatchRule{ + { + Matcher: []string{"test_default1.*"}, + PartitionRule: "default", + }, + { + Matcher: []string{"test_default2.*"}, + PartitionRule: "unknown-dispatcher", + }, + { + Matcher: []string{"test_table.*"}, + PartitionRule: "table", + TopicRule: "hello_{schema}_world", + }, + { + Matcher: []string{"test_index_value.*"}, + PartitionRule: "index-value", + TopicRule: "{schema}_world", + }, + { + Matcher: []string{"test.*"}, + PartitionRule: "rowid", + TopicRule: "hello_{schema}", + }, + { + Matcher: []string{"*.*", "!*.test"}, + PartitionRule: "ts", + TopicRule: "{schema}_{table}", + }, + }, + }, + }, "test") + require.Nil(t, err) + + p := d.GetPartition(&model.RowChangedEvent{ + Table: &model.TableName{Schema: "test_default1", Table: "table"}, + Columns: []*model.Column{ + { + Name: "id", + Value: 1, + Flag: model.HandleKeyFlag | model.PrimaryKeyFlag, + }, + }, + IndexColumns: [][]int{{0}}, + }, 16) + require.Equal(t, int32(10), p) + p = d.GetPartition(&model.RowChangedEvent{ + Table: &model.TableName{Schema: "test_default2", Table: "table"}, + Columns: []*model.Column{ + { + Name: "id", + Value: 1, + Flag: model.HandleKeyFlag | model.PrimaryKeyFlag, + }, + }, + IndexColumns: [][]int{{0}}, + }, 16) + require.Equal(t, int32(4), p) + + p = d.GetPartition(&model.RowChangedEvent{ + Table: &model.TableName{Schema: "test_table", Table: "table"}, + CommitTs: 1, + }, 16) + require.Equal(t, int32(15), p) + p = d.GetPartition(&model.RowChangedEvent{ + Table: &model.TableName{Schema: "test_index_value", Table: "table"}, + Columns: []*model.Column{ + { + Name: "a", + Value: 11, + Flag: model.HandleKeyFlag, + }, { + Name: "b", + Value: 22, + Flag: 0, + }, + }, + }, 10) + require.Equal(t, int32(1), p) + p = d.GetPartition(&model.RowChangedEvent{ + Table: &model.TableName{Schema: "a", Table: "table"}, + CommitTs: 1, + }, 2) + require.Equal(t, int32(1), p) +} diff --git a/cdc/sink/manager/kafka/manager.go b/cdc/sink/manager/kafka/manager.go new file mode 100644 index 00000000000..18473f911ba --- /dev/null +++ b/cdc/sink/manager/kafka/manager.go @@ -0,0 +1,177 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +import ( + "fmt" + "sync" + "time" + + "github.com/Shopify/sarama" + "github.com/pingcap/errors" + "github.com/pingcap/log" + kafkaconfig "github.com/pingcap/tiflow/cdc/sink/producer/kafka" + cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/kafka" + "go.uber.org/atomic" + "go.uber.org/zap" +) + +// TopicManager is a manager for kafka topics. +type TopicManager struct { + client kafka.Client + admin kafka.ClusterAdminClient + + cfg *kafkaconfig.AutoCreateTopicConfig + + topics sync.Map + + lastMetadataRefresh atomic.Int64 +} + +// NewTopicManager creates a new topic manager. +func NewTopicManager( + client kafka.Client, + admin kafka.ClusterAdminClient, + cfg *kafkaconfig.AutoCreateTopicConfig, +) *TopicManager { + return &TopicManager{ + client: client, + admin: admin, + cfg: cfg, + } +} + +// Partitions returns the number of partitions of the topic. +// It may also try to update the topics' information maintained by manager. +func (m *TopicManager) Partitions(topic string) (int32, error) { + err := m.tryRefreshMeta() + if err != nil { + return 0, errors.Trace(err) + } + + if partitions, ok := m.topics.Load(topic); ok { + return partitions.(int32), nil + } + + return m.cfg.PartitionNum, m.CreateTopic(topic) +} + +// tryRefreshMeta try to refresh the topics' information maintained by manager. +func (m *TopicManager) tryRefreshMeta() error { + if time.Since(time.Unix(m.lastMetadataRefresh.Load(), 0)) > time.Minute { + topics, err := m.client.Topics() + if err != nil { + return err + } + + for _, topic := range topics { + partitions, err := m.client.Partitions(topic) + if err != nil { + return err + } + m.tryUpdatePartitionsAndLogging(topic, int32(len(partitions))) + } + m.lastMetadataRefresh.Store(time.Now().Unix()) + } + + return nil +} + +// tryUpdatePartitionsAndLogging try to update the partitions of the topic. +func (m *TopicManager) tryUpdatePartitionsAndLogging(topic string, partitions int32) { + oldPartitions, ok := m.topics.Load(topic) + if ok { + if oldPartitions.(int32) != partitions { + m.topics.Store(topic, partitions) + log.Info( + "update topic partition number", + zap.String("duration", topic), + zap.Int32("oldPartitionNumber", oldPartitions.(int32)), + zap.Int32("newPartitionNumber", partitions), + ) + } + } else { + m.topics.Store(topic, partitions) + log.Info( + "store topic partition number", + zap.String("duration", topic), + zap.Int32("partitionNumber", partitions), + ) + } +} + +// CreateTopic creates a topic with the given name. +func (m *TopicManager) CreateTopic(topicName string) error { + start := time.Now() + topics, err := m.admin.ListTopics() + if err != nil { + log.Error( + "Kafka admin client list topics failed", + zap.Error(err), + zap.Duration("cost", time.Since(start)), + ) + return errors.Trace(err) + } + log.Info( + "Kafka admin client list topics success", + zap.Duration("cost", time.Since(start)), + ) + + // Now that we have access to the latest topics' information, + // we need to update it here immediately. + for topic, detail := range topics { + m.tryUpdatePartitionsAndLogging(topic, detail.NumPartitions) + } + m.lastMetadataRefresh.Store(time.Now().Unix()) + + // Maybe our cache has expired information, so we just return it. + if _, ok := topics[topicName]; ok { + return nil + } + + if !m.cfg.AutoCreate { + return cerror.ErrKafkaInvalidConfig.GenWithStack( + fmt.Sprintf("`auto-create-topic` is false, "+ + "and %s not found", topicName)) + } + + start = time.Now() + err = m.admin.CreateTopic(topicName, &sarama.TopicDetail{ + NumPartitions: m.cfg.PartitionNum, + ReplicationFactor: m.cfg.ReplicationFactor, + }, false) + // Ignore topic already exists error. + if err != nil && errors.Cause(err) != sarama.ErrTopicAlreadyExists { + log.Error( + "Kafka admin client create the topic failed", + zap.String("topic", topicName), + zap.Int32("partitionNumber", m.cfg.PartitionNum), + zap.Int16("replicationFactor", m.cfg.ReplicationFactor), + zap.Error(err), + zap.Duration("duration", time.Since(start)), + ) + return cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) + } + log.Info( + "Kafka admin client create the topic success", + zap.String("topic", topicName), + zap.Int32("partitionNumber", m.cfg.PartitionNum), + zap.Int16("replicationFactor", m.cfg.ReplicationFactor), + zap.Duration("duration", time.Since(start)), + ) + m.tryUpdatePartitionsAndLogging(topicName, m.cfg.PartitionNum) + + return nil +} diff --git a/cdc/sink/manager/kafka/manager_test.go b/cdc/sink/manager/kafka/manager_test.go new file mode 100644 index 00000000000..03e46c003e0 --- /dev/null +++ b/cdc/sink/manager/kafka/manager_test.go @@ -0,0 +1,112 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +import ( + "testing" + "time" + + kafkaconfig "github.com/pingcap/tiflow/cdc/sink/producer/kafka" + kafkamock "github.com/pingcap/tiflow/pkg/kafka" + "github.com/stretchr/testify/require" +) + +func TestPartitions(t *testing.T) { + t.Parallel() + + client := kafkamock.NewClientMockImpl() + adminClient := kafkamock.NewClusterAdminClientMockImpl() + defer func(adminClient *kafkamock.ClusterAdminClientMockImpl) { + _ = adminClient.Close() + }(adminClient) + cfg := &kafkaconfig.AutoCreateTopicConfig{ + AutoCreate: true, + PartitionNum: 2, + ReplicationFactor: 1, + } + + manager := NewTopicManager(client, adminClient, cfg) + partitionsNum, err := manager.Partitions(kafkamock.DefaultMockTopicName) + require.Nil(t, err) + require.Equal(t, int32(3), partitionsNum) +} + +func TestTryRefreshMeta(t *testing.T) { + t.Parallel() + + client := kafkamock.NewClientMockImpl() + adminClient := kafkamock.NewClusterAdminClientMockImpl() + defer func(adminClient *kafkamock.ClusterAdminClientMockImpl) { + _ = adminClient.Close() + }(adminClient) + cfg := &kafkaconfig.AutoCreateTopicConfig{ + AutoCreate: true, + PartitionNum: 2, + ReplicationFactor: 1, + } + + manager := NewTopicManager(client, adminClient, cfg) + partitionsNum, err := manager.Partitions(kafkamock.DefaultMockTopicName) + require.Nil(t, err) + require.Equal(t, int32(3), partitionsNum) + + // Mock create a topic. + client.AddTopic("test", 4) + manager.lastMetadataRefresh.Store(time.Now().Add(-2 * time.Minute).Unix()) + partitionsNum, err = manager.Partitions("test") + require.Nil(t, err) + require.Equal(t, int32(4), partitionsNum) + + // Mock delete a topic. + // NOTICE: we do not refresh metadata for the deleted topic. + client.DeleteTopic("test") + partitionsNum, err = manager.Partitions("test") + require.Nil(t, err) + require.Equal(t, int32(4), partitionsNum) +} + +func TestCreateTopic(t *testing.T) { + t.Parallel() + + client := kafkamock.NewClientMockImpl() + adminClient := kafkamock.NewClusterAdminClientMockImpl() + defer func(adminClient *kafkamock.ClusterAdminClientMockImpl) { + _ = adminClient.Close() + }(adminClient) + cfg := &kafkaconfig.AutoCreateTopicConfig{ + AutoCreate: true, + PartitionNum: 2, + ReplicationFactor: 1, + } + + manager := NewTopicManager(client, adminClient, cfg) + err := manager.CreateTopic(kafkamock.DefaultMockTopicName) + require.Nil(t, err) + + err = manager.CreateTopic("new-topic") + require.Nil(t, err) + partitionsNum, err := manager.Partitions("new-topic") + require.Nil(t, err) + require.Equal(t, int32(2), partitionsNum) + + // Try to create a topic without auto create. + cfg.AutoCreate = false + manager = NewTopicManager(client, adminClient, cfg) + err = manager.CreateTopic("new-topic2") + require.Regexp( + t, + "`auto-create-topic` is false, and new-topic2 not found", + err, + ) +} diff --git a/cdc/sink/manager/manager.go b/cdc/sink/manager/manager.go new file mode 100644 index 00000000000..5556007e1bd --- /dev/null +++ b/cdc/sink/manager/manager.go @@ -0,0 +1,24 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package manager + +// TopicManager is the interface of topic manager. +// It will be responsible for creating and +// updating the information of the topic. +type TopicManager interface { + // Partitions returns the partitions of the topic. + Partitions(topic string) (int32, error) + // CreateTopic creates the topic. + CreateTopic(topicName string) error +} diff --git a/cdc/sink/manager/pulsar/manager.go b/cdc/sink/manager/pulsar/manager.go new file mode 100644 index 00000000000..2c4846e7351 --- /dev/null +++ b/cdc/sink/manager/pulsar/manager.go @@ -0,0 +1,40 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package pulsar + +// TopicManager is the interface +// that wraps the basic Pulsar topic management operations. +// Right now it doesn't have any implementation, +// Pulsar doesn't support multiple topics yet. +// So it now just returns a fixed number of partitions for a fixed topic. +type TopicManager struct { + partitionNum int32 +} + +// NewTopicManager creates a new TopicManager. +func NewTopicManager(partitionNum int32) *TopicManager { + return &TopicManager{ + partitionNum: partitionNum, + } +} + +// Partitions returns the number of partitions of the topic. +func (m *TopicManager) Partitions(_ string) (int32, error) { + return m.partitionNum, nil +} + +// CreateTopic do nothing. +func (m *TopicManager) CreateTopic(_ string) error { + return nil +} diff --git a/cdc/sink/mq.go b/cdc/sink/mq.go index 6e33d1d007c..cbe3cb35d47 100644 --- a/cdc/sink/mq.go +++ b/cdc/sink/mq.go @@ -19,11 +19,15 @@ import ( "strings" "sync" + "github.com/Shopify/sarama" "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sink/codec" "github.com/pingcap/tiflow/cdc/sink/dispatcher" + "github.com/pingcap/tiflow/cdc/sink/manager" + kafkamanager "github.com/pingcap/tiflow/cdc/sink/manager/kafka" + pulsarmanager "github.com/pingcap/tiflow/cdc/sink/manager/pulsar" "github.com/pingcap/tiflow/cdc/sink/producer" "github.com/pingcap/tiflow/cdc/sink/producer/kafka" "github.com/pingcap/tiflow/cdc/sink/producer/pulsar" @@ -56,8 +60,7 @@ type mqSink struct { filter *filter.Filter protocol config.Protocol - // TODO(hi-rustin): remove this after topic manager is ready. - partitionNum int32 + topicManager manager.TopicManager flushWorker *flushWorker tableCheckpointTsMap sync.Map resolvedBuffer chan resolvedTsEvent @@ -69,9 +72,14 @@ type mqSink struct { } func newMqSink( - ctx context.Context, credential *security.Credential, mqProducer producer.Producer, - filter *filter.Filter, defaultTopic string, replicaConfig *config.ReplicaConfig, - encoderConfig *codec.Config, errCh chan error, + ctx context.Context, + credential *security.Credential, + topicManager manager.TopicManager, + mqProducer producer.Producer, + filter *filter.Filter, + defaultTopic string, + replicaConfig *config.ReplicaConfig, encoderConfig *codec.Config, + errCh chan error, ) (*mqSink, error) { encoderBuilder, err := codec.NewEventBatchEncoderBuilder(encoderConfig, credential) if err != nil { @@ -99,7 +107,7 @@ func newMqSink( encoderBuilder: encoderBuilder, filter: filter, protocol: encoderConfig.Protocol(), - partitionNum: mqProducer.GetPartitionNum(), + topicManager: topicManager, flushWorker: flushWorker, resolvedBuffer: make(chan resolvedTsEvent, defaultResolvedTsEventBufferSize), statistics: statistics, @@ -143,7 +151,12 @@ func (k *mqSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowCha zap.Any("role", k.role)) continue } - _, partition := k.eventRouter.DispatchRowChangedEvent(row, k.partitionNum) + topic := k.eventRouter.GetTopic(row) + partitionNum, err := k.topicManager.Partitions(topic) + if err != nil { + return errors.Trace(err) + } + partition := k.eventRouter.GetPartition(row, partitionNum) select { case <-ctx.Done(): return ctx.Err() @@ -324,11 +337,6 @@ func newKafkaSaramaSink(ctx context.Context, sinkURI *url.URL, if err != nil { return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) } - defer func() { - if err := adminClient.Close(); err != nil { - log.Warn("close admin client error", zap.Error(err)) - } - }() if err := kafka.AdjustConfig(adminClient, baseConfig, saramaConfig, topic); err != nil { return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) @@ -351,17 +359,44 @@ func newKafkaSaramaSink(ctx context.Context, sinkURI *url.URL, return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) } - topicConfig := baseConfig.DeriveTopicConfig(topic) - if err := kafka.CreateTopic(adminClient, topicConfig); err != nil { + client, err := sarama.NewClient(baseConfig.BrokerEndpoints, saramaConfig) + if err != nil { + return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) + } + + topicManager := kafkamanager.NewTopicManager( + client, + adminClient, + baseConfig.DeriveTopicConfig(), + ) + if err := topicManager.CreateTopic(topic); err != nil { return nil, cerror.WrapError(cerror.ErrKafkaCreateTopic, err) } - sProducer, err := kafka.NewKafkaSaramaProducer(ctx, topic, baseConfig, saramaConfig, errCh) + sProducer, err := kafka.NewKafkaSaramaProducer( + ctx, + client, + adminClient, + topic, + baseConfig, + saramaConfig, + errCh, + ) if err != nil { return nil, errors.Trace(err) } - sink, err := newMqSink(ctx, baseConfig.Credential, sProducer, filter, topic, replicaConfig, encoderConfig, errCh) + sink, err := newMqSink( + ctx, + baseConfig.Credential, + topicManager, + sProducer, + filter, + topic, + replicaConfig, + encoderConfig, + errCh, + ) if err != nil { return nil, errors.Trace(err) } @@ -401,7 +436,20 @@ func newPulsarSink(ctx context.Context, sinkURI *url.URL, filter *filter.Filter, // For now, it's a placeholder. Avro format have to make connection to Schema Registry, // and it may need credential. credential := &security.Credential{} - sink, err := newMqSink(ctx, credential, producer, filter, "", replicaConfig, encoderConfig, errCh) + fakeTopicManager := pulsarmanager.NewTopicManager( + producer.GetPartitionNum(), + ) + sink, err := newMqSink( + ctx, + credential, + fakeTopicManager, + producer, + filter, + "", + replicaConfig, + encoderConfig, + errCh, + ) if err != nil { return nil, errors.Trace(err) } diff --git a/cdc/sink/producer/kafka/config.go b/cdc/sink/producer/kafka/config.go index e613ede6464..0d7dc06e03b 100644 --- a/cdc/sink/producer/kafka/config.go +++ b/cdc/sink/producer/kafka/config.go @@ -97,20 +97,19 @@ func (c *Config) setPartitionNum(realPartitionCount int32) error { return nil } -type topicConfig struct { - autoCreate bool - name string - partitionNum int32 - replicationFactor int16 +// AutoCreateTopicConfig is used to create topic configuration. +type AutoCreateTopicConfig struct { + AutoCreate bool + PartitionNum int32 + ReplicationFactor int16 } // DeriveTopicConfig derive a `topicConfig` from the `Config` -func (c *Config) DeriveTopicConfig(topicName string) *topicConfig { - return &topicConfig{ - name: topicName, - autoCreate: c.AutoCreate, - partitionNum: c.PartitionNum, - replicationFactor: c.ReplicationFactor, +func (c *Config) DeriveTopicConfig() *AutoCreateTopicConfig { + return &AutoCreateTopicConfig{ + AutoCreate: c.AutoCreate, + PartitionNum: c.PartitionNum, + ReplicationFactor: c.ReplicationFactor, } } diff --git a/cdc/sink/producer/kafka/kafka.go b/cdc/sink/producer/kafka/kafka.go index cec861bbc85..21b4434cd4a 100644 --- a/cdc/sink/producer/kafka/kafka.go +++ b/cdc/sink/producer/kafka/kafka.go @@ -18,7 +18,6 @@ import ( "fmt" "regexp" "strconv" - "strings" "sync" "sync/atomic" "time" @@ -52,7 +51,8 @@ type kafkaSaramaProducer struct { // clientLock is used to protect concurrent access of asyncProducer and syncProducer. // Since we don't close these two clients (which have an input chan) from the // sender routine, data race or send on closed chan could happen. - clientLock sync.RWMutex + clientLock sync.RWMutex + // This admin mainly used by `metricsMonitor` to fetch broker info. admin kafka.ClusterAdminClient client sarama.Client asyncProducer sarama.AsyncProducer @@ -318,23 +318,20 @@ func (k *kafkaSaramaProducer) run(ctx context.Context) error { var NewAdminClientImpl kafka.ClusterAdminClientCreator = kafka.NewSaramaAdminClient // NewKafkaSaramaProducer creates a kafka sarama producer -func NewKafkaSaramaProducer(ctx context.Context, topic string, config *Config, saramaConfig *sarama.Config, errCh chan error) (*kafkaSaramaProducer, error) { +func NewKafkaSaramaProducer( + ctx context.Context, + client sarama.Client, + admin kafka.ClusterAdminClient, + topic string, + config *Config, + saramaConfig *sarama.Config, + errCh chan error, +) (*kafkaSaramaProducer, error) { changefeedID := util.ChangefeedIDFromCtx(ctx) role := util.RoleFromCtx(ctx) log.Info("Starting kafka sarama producer ...", zap.Any("config", config), zap.String("changefeed", changefeedID), zap.Any("role", role)) - // this admin mainly used by `metricsMonitor` to fetch broker info. - admin, err := NewAdminClientImpl(config.BrokerEndpoints, saramaConfig) - if err != nil { - return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) - } - - client, err := sarama.NewClient(config.BrokerEndpoints, saramaConfig) - if err != nil { - return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) - } - asyncProducer, err := sarama.NewAsyncProducerFromClient(client) if err != nil { return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) @@ -345,9 +342,6 @@ func NewKafkaSaramaProducer(ctx context.Context, topic string, config *Config, s return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) } - if err != nil { - return nil, err - } k := &kafkaSaramaProducer{ admin: admin, client: client, @@ -479,41 +473,6 @@ func AdjustConfig(admin kafka.ClusterAdminClient, config *Config, return nil } -// CreateTopic create the topic by `topicConfig`. -func CreateTopic(admin kafka.ClusterAdminClient, config *topicConfig) error { - topics, err := admin.ListTopics() - if err != nil { - return errors.Trace(err) - } - - if _, ok := topics[config.name]; ok { - // no need to create the topic, but we would have to log user if they found enter wrong topic name later - if config.autoCreate { - log.Warn("topic already exist, TiCDC will not create the topic", zap.String("topic", config.name)) - } - return nil - } - - if !config.autoCreate { - return cerror.ErrKafkaInvalidConfig.GenWithStack("`auto-create-topic` is false, and topic not found") - } - - err = admin.CreateTopic(config.name, &sarama.TopicDetail{ - NumPartitions: config.partitionNum, - ReplicationFactor: config.replicationFactor, - }, false) - // TODO identify the cause of "Topic with this name already exists" - if err != nil && !strings.Contains(err.Error(), "already exists") { - return cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) - } - - log.Info("TiCDC create the topic", - zap.Int32("partition-num", config.partitionNum), - zap.Int16("replication-factor", config.replicationFactor)) - - return nil -} - func validateMinInsyncReplicas(admin kafka.ClusterAdminClient, topics map[string]sarama.TopicDetail, topic string, replicationFactor int) error { minInsyncReplicasConfigGetter := func() (string, bool, error) { diff --git a/cdc/sink/producer/kafka/kafka_test.go b/cdc/sink/producer/kafka/kafka_test.go index 7d31aa1c5ea..8ace76310c6 100644 --- a/cdc/sink/producer/kafka/kafka_test.go +++ b/cdc/sink/producer/kafka/kafka_test.go @@ -104,7 +104,19 @@ func (s *kafkaSuite) TestNewSaramaProducer(c *check.C) { saramaConfig, err := NewSaramaConfig(ctx, config) c.Assert(err, check.IsNil) saramaConfig.Producer.Flush.MaxMessages = 1 - producer, err := NewKafkaSaramaProducer(ctx, topic, config, saramaConfig, errCh) + client, err := sarama.NewClient(config.BrokerEndpoints, saramaConfig) + c.Assert(err, check.IsNil) + adminClient, err := NewAdminClientImpl(config.BrokerEndpoints, saramaConfig) + c.Assert(err, check.IsNil) + producer, err := NewKafkaSaramaProducer( + ctx, + client, + adminClient, + topic, + config, + saramaConfig, + errCh, + ) c.Assert(err, check.IsNil) c.Assert(producer.GetPartitionNum(), check.Equals, int32(2)) @@ -323,39 +335,6 @@ func (s *kafkaSuite) TestAdjustConfigMinInsyncReplicas(c *check.C) { ) } -func (s *kafkaSuite) TestCreateTopic(c *check.C) { - defer testleak.AfterTest(c)() - - adminClient := kafka.NewClusterAdminClientMockImpl() - defer func() { - _ = adminClient.Close() - }() - - // `auto-create-topic` enable, topic exist - config := NewConfig() - topicConfig := config.DeriveTopicConfig(adminClient.GetDefaultMockTopicName()) - err := CreateTopic(adminClient, topicConfig) - c.Assert(err, check.IsNil) - - // `auto-create-topic` enable, topic not exist - topicConfig = config.DeriveTopicConfig("not-exist-topic-1") - err = CreateTopic(adminClient, topicConfig) - c.Assert(err, check.IsNil) - - topics, err := adminClient.ListTopics() - c.Assert(err, check.IsNil) - _, ok := topics["not-exist-topic-1"] - c.Assert(ok, check.IsTrue) - - // `auto-create-topic` disable, topic not exist - config.AutoCreate = false - topicConfig = config.DeriveTopicConfig("not-exist-topic-2") - err = CreateTopic(adminClient, topicConfig) - c.Assert(errors.Cause(err), - check.ErrorMatches, - ".*auto-create-topic` is false, and topic not found.*") -} - func (s *kafkaSuite) TestCreateProducerFailed(c *check.C) { defer testleak.AfterTest(c)() config := NewConfig() @@ -402,7 +381,19 @@ func (s *kafkaSuite) TestProducerSendMessageFailed(c *check.C) { saramaConfig.Producer.Retry.Max = 2 saramaConfig.Producer.MaxMessageBytes = 8 - producer, err := NewKafkaSaramaProducer(ctx, topic, config, saramaConfig, errCh) + client, err := sarama.NewClient(config.BrokerEndpoints, saramaConfig) + c.Assert(err, check.IsNil) + adminClient, err := NewAdminClientImpl(config.BrokerEndpoints, saramaConfig) + c.Assert(err, check.IsNil) + producer, err := NewKafkaSaramaProducer( + ctx, + client, + adminClient, + topic, + config, + saramaConfig, + errCh, + ) defer func() { err := producer.Close() c.Assert(err, check.IsNil) @@ -472,7 +463,19 @@ func (s *kafkaSuite) TestProducerDoubleClose(c *check.C) { ctx = util.PutRoleInCtx(ctx, util.RoleTester) saramaConfig, err := NewSaramaConfig(context.Background(), config) c.Assert(err, check.IsNil) - producer, err := NewKafkaSaramaProducer(ctx, topic, config, saramaConfig, errCh) + client, err := sarama.NewClient(config.BrokerEndpoints, saramaConfig) + c.Assert(err, check.IsNil) + adminClient, err := NewAdminClientImpl(config.BrokerEndpoints, saramaConfig) + c.Assert(err, check.IsNil) + producer, err := NewKafkaSaramaProducer( + ctx, + client, + adminClient, + topic, + config, + saramaConfig, + errCh, + ) defer func() { err := producer.Close() c.Assert(err, check.IsNil) diff --git a/pkg/kafka/client.go b/pkg/kafka/client.go new file mode 100644 index 00000000000..b6b8bf24f89 --- /dev/null +++ b/pkg/kafka/client.go @@ -0,0 +1,24 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +// Client is a generic Kafka client. +type Client interface { + // Topics returns the set of available + // topics as retrieved from cluster metadata. + Topics() ([]string, error) + // Partitions returns the sorted list of + // all partition IDs for the given topic. + Partitions(topic string) ([]int32, error) +} diff --git a/pkg/kafka/client_mock_impl.go b/pkg/kafka/client_mock_impl.go new file mode 100644 index 00000000000..02935526a36 --- /dev/null +++ b/pkg/kafka/client_mock_impl.go @@ -0,0 +1,56 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +// ClientMockImpl is a mock implementation of Client interface. +type ClientMockImpl struct { + topics map[string][]int32 +} + +// NewClientMockImpl creates a new ClientMockImpl instance. +func NewClientMockImpl() *ClientMockImpl { + topics := make(map[string][]int32) + topics[DefaultMockTopicName] = []int32{0, 1, 2} + return &ClientMockImpl{ + topics: topics, + } +} + +// Partitions returns the partitions of the given topic. +func (c *ClientMockImpl) Partitions(topic string) ([]int32, error) { + return c.topics[topic], nil +} + +// Topics returns the all topics. +func (c *ClientMockImpl) Topics() ([]string, error) { + var topics []string + for topic := range c.topics { + topics = append(topics, topic) + } + return topics, nil +} + +// AddTopic adds a topic. +func (c *ClientMockImpl) AddTopic(topicName string, partitions int32) { + p := make([]int32, partitions) + for i := int32(0); i < partitions; i++ { + p[i] = i + } + c.topics[topicName] = p +} + +// DeleteTopic deletes a topic. +func (c *ClientMockImpl) DeleteTopic(topicName string) { + delete(c.topics, topicName) +}