From 37a2a2e10b81e8a0ebae9b8b563c83ab1f750305 Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Wed, 9 Mar 2022 19:06:16 +0800 Subject: [PATCH] sink(ticdc): fix lint --- cdc/sink/dispatcher/event_router.go | 11 ++- cdc/sink/dispatcher/event_router_test.go | 102 +++++++++++++++++++---- cdc/sink/manager/kafka/manager.go | 3 +- cdc/sink/manager/kafka/manager_test.go | 6 +- cdc/sink/manager/manager.go | 3 +- cdc/sink/manager/pulsar/manager.go | 6 +- cdc/sink/mq.go | 44 ++++++++-- cdc/sink/producer/kafka/kafka_test.go | 30 ++++++- pkg/kafka/client.go | 6 +- 9 files changed, 175 insertions(+), 36 deletions(-) diff --git a/cdc/sink/dispatcher/event_router.go b/cdc/sink/dispatcher/event_router.go index bd25dda8d54..767d166de35 100644 --- a/cdc/sink/dispatcher/event_router.go +++ b/cdc/sink/dispatcher/event_router.go @@ -114,9 +114,14 @@ func (s *EventRouter) GetTopic(row *model.RowChangedEvent) string { } // GetPartition returns the target partition. -func (s *EventRouter) GetPartition(row *model.RowChangedEvent, partitionNum int32) int32 { - _, partitionDispatcher := s.matchDispatcher(row.Table.Schema, row.Table.Table) - partition := partitionDispatcher.DispatchRowChangedEvent(row, partitionNum) +func (s *EventRouter) GetPartition( + row *model.RowChangedEvent, + partitionNum int32, +) int32 { + _, partitionDispatcher := s.matchDispatcher(row.Table.Schema, + row.Table.Table) + partition := partitionDispatcher.DispatchRowChangedEvent(row, + partitionNum) return partition } diff --git a/cdc/sink/dispatcher/event_router_test.go b/cdc/sink/dispatcher/event_router_test.go index ac0701e72f7..7d26ca71d16 100644 --- a/cdc/sink/dispatcher/event_router_test.go +++ b/cdc/sink/dispatcher/event_router_test.go @@ -102,12 +102,34 @@ func TestGetActiveTopics(t *testing.T) { d, err := NewEventRouter(&config.ReplicaConfig{ Sink: &config.SinkConfig{ DispatchRules: []*config.DispatchRule{ - {Matcher: []string{"test_default1.*"}, PartitionRule: "default"}, - {Matcher: []string{"test_default2.*"}, PartitionRule: "unknown-dispatcher"}, - {Matcher: []string{"test_table.*"}, PartitionRule: "table", TopicRule: "hello_{schema}_world"}, - {Matcher: []string{"test_index_value.*"}, PartitionRule: "index-value", TopicRule: "{schema}_world"}, - {Matcher: []string{"test.*"}, PartitionRule: "rowid", TopicRule: "hello_{schema}"}, - {Matcher: []string{"*.*", "!*.test"}, PartitionRule: "ts", TopicRule: "{schema}_{table}"}, + { + Matcher: []string{"test_default1.*"}, + PartitionRule: "default", + }, + { + Matcher: []string{"test_default2.*"}, + PartitionRule: "unknown-dispatcher", + }, + { + Matcher: []string{"test_table.*"}, + PartitionRule: "table", + TopicRule: "hello_{schema}_world", + }, + { + Matcher: []string{"test_index_value.*"}, + PartitionRule: "index-value", + TopicRule: "{schema}_world", + }, + { + Matcher: []string{"test.*"}, + PartitionRule: "rowid", + TopicRule: "hello_{schema}", + }, + { + Matcher: []string{"*.*", "!*.test"}, + PartitionRule: "ts", + TopicRule: "{schema}_{table}", + }, }, }, }, "test") @@ -130,12 +152,34 @@ func TestGetTopic(t *testing.T) { d, err := NewEventRouter(&config.ReplicaConfig{ Sink: &config.SinkConfig{ DispatchRules: []*config.DispatchRule{ - {Matcher: []string{"test_default1.*"}, PartitionRule: "default"}, - {Matcher: []string{"test_default2.*"}, PartitionRule: "unknown-dispatcher"}, - {Matcher: []string{"test_table.*"}, PartitionRule: "table", TopicRule: "hello_{schema}_world"}, - {Matcher: []string{"test_index_value.*"}, PartitionRule: "index-value", TopicRule: "{schema}_world"}, - {Matcher: []string{"test.*"}, PartitionRule: "rowid", TopicRule: "hello_{schema}"}, - {Matcher: []string{"*.*", "!*.test"}, PartitionRule: "ts", TopicRule: "{schema}_{table}"}, + { + Matcher: []string{"test_default1.*"}, + PartitionRule: "default", + }, + { + Matcher: []string{"test_default2.*"}, + PartitionRule: "unknown-dispatcher", + }, + { + Matcher: []string{"test_table.*"}, + PartitionRule: "table", + TopicRule: "hello_{schema}_world", + }, + { + Matcher: []string{"test_index_value.*"}, + PartitionRule: "index-value", + TopicRule: "{schema}_world", + }, + { + Matcher: []string{"test.*"}, + PartitionRule: "rowid", + TopicRule: "hello_{schema}", + }, + { + Matcher: []string{"*.*", "!*.test"}, + PartitionRule: "ts", + TopicRule: "{schema}_{table}", + }, }, }, }, "test") @@ -169,12 +213,34 @@ func TestGetPartition(t *testing.T) { d, err := NewEventRouter(&config.ReplicaConfig{ Sink: &config.SinkConfig{ DispatchRules: []*config.DispatchRule{ - {Matcher: []string{"test_default1.*"}, PartitionRule: "default"}, - {Matcher: []string{"test_default2.*"}, PartitionRule: "unknown-dispatcher"}, - {Matcher: []string{"test_table.*"}, PartitionRule: "table", TopicRule: "hello_{schema}_world"}, - {Matcher: []string{"test_index_value.*"}, PartitionRule: "index-value", TopicRule: "{schema}_world"}, - {Matcher: []string{"test.*"}, PartitionRule: "rowid", TopicRule: "hello_{schema}"}, - {Matcher: []string{"*.*", "!*.test"}, PartitionRule: "ts", TopicRule: "{schema}_{table}"}, + { + Matcher: []string{"test_default1.*"}, + PartitionRule: "default", + }, + { + Matcher: []string{"test_default2.*"}, + PartitionRule: "unknown-dispatcher", + }, + { + Matcher: []string{"test_table.*"}, + PartitionRule: "table", + TopicRule: "hello_{schema}_world", + }, + { + Matcher: []string{"test_index_value.*"}, + PartitionRule: "index-value", + TopicRule: "{schema}_world", + }, + { + Matcher: []string{"test.*"}, + PartitionRule: "rowid", + TopicRule: "hello_{schema}", + }, + { + Matcher: []string{"*.*", "!*.test"}, + PartitionRule: "ts", + TopicRule: "{schema}_{table}", + }, }, }, }, "test") diff --git a/cdc/sink/manager/kafka/manager.go b/cdc/sink/manager/kafka/manager.go index d5a84227ddb..53822b3c6bb 100644 --- a/cdc/sink/manager/kafka/manager.go +++ b/cdc/sink/manager/kafka/manager.go @@ -110,7 +110,8 @@ func (m *TopicManager) CreateTopic(topicName string) error { if !m.cfg.AutoCreate { return cerror.ErrKafkaInvalidConfig.GenWithStack( - fmt.Sprintf("`auto-create-topic` is false, and %s not found", topicName)) + fmt.Sprintf("`auto-create-topic` is false, "+ + "and %s not found", topicName)) } err = m.admin.CreateTopic(topicName, &sarama.TopicDetail{ diff --git a/cdc/sink/manager/kafka/manager_test.go b/cdc/sink/manager/kafka/manager_test.go index 42a1bbc39f3..03e46c003e0 100644 --- a/cdc/sink/manager/kafka/manager_test.go +++ b/cdc/sink/manager/kafka/manager_test.go @@ -104,5 +104,9 @@ func TestCreateTopic(t *testing.T) { cfg.AutoCreate = false manager = NewTopicManager(client, adminClient, cfg) err = manager.CreateTopic("new-topic2") - require.Regexp(t, "`auto-create-topic` is false, and new-topic2 not found", err) + require.Regexp( + t, + "`auto-create-topic` is false, and new-topic2 not found", + err, + ) } diff --git a/cdc/sink/manager/manager.go b/cdc/sink/manager/manager.go index fd45999bd54..5556007e1bd 100644 --- a/cdc/sink/manager/manager.go +++ b/cdc/sink/manager/manager.go @@ -14,7 +14,8 @@ package manager // TopicManager is the interface of topic manager. -// It will be responsible for creating and updating the information of the topic. +// It will be responsible for creating and +// updating the information of the topic. type TopicManager interface { // Partitions returns the partitions of the topic. Partitions(topic string) (int32, error) diff --git a/cdc/sink/manager/pulsar/manager.go b/cdc/sink/manager/pulsar/manager.go index ff47ad8346f..2c4846e7351 100644 --- a/cdc/sink/manager/pulsar/manager.go +++ b/cdc/sink/manager/pulsar/manager.go @@ -13,8 +13,10 @@ package pulsar -// TopicManager is the interface that wraps the basic Pulsar topic management operations. -// Right now it doesn't have any implementation, Pulsar doesn't support multiple topics yet. +// TopicManager is the interface +// that wraps the basic Pulsar topic management operations. +// Right now it doesn't have any implementation, +// Pulsar doesn't support multiple topics yet. // So it now just returns a fixed number of partitions for a fixed topic. type TopicManager struct { partitionNum int32 diff --git a/cdc/sink/mq.go b/cdc/sink/mq.go index bafe2dbee8a..cbe3cb35d47 100644 --- a/cdc/sink/mq.go +++ b/cdc/sink/mq.go @@ -364,17 +364,39 @@ func newKafkaSaramaSink(ctx context.Context, sinkURI *url.URL, return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) } - topicManager := kafkamanager.NewTopicManager(client, adminClient, baseConfig.DeriveTopicConfig()) + topicManager := kafkamanager.NewTopicManager( + client, + adminClient, + baseConfig.DeriveTopicConfig(), + ) if err := topicManager.CreateTopic(topic); err != nil { return nil, cerror.WrapError(cerror.ErrKafkaCreateTopic, err) } - sProducer, err := kafka.NewKafkaSaramaProducer(ctx, client, adminClient, topic, baseConfig, saramaConfig, errCh) + sProducer, err := kafka.NewKafkaSaramaProducer( + ctx, + client, + adminClient, + topic, + baseConfig, + saramaConfig, + errCh, + ) if err != nil { return nil, errors.Trace(err) } - sink, err := newMqSink(ctx, baseConfig.Credential, topicManager, sProducer, filter, topic, replicaConfig, encoderConfig, errCh) + sink, err := newMqSink( + ctx, + baseConfig.Credential, + topicManager, + sProducer, + filter, + topic, + replicaConfig, + encoderConfig, + errCh, + ) if err != nil { return nil, errors.Trace(err) } @@ -414,8 +436,20 @@ func newPulsarSink(ctx context.Context, sinkURI *url.URL, filter *filter.Filter, // For now, it's a placeholder. Avro format have to make connection to Schema Registry, // and it may need credential. credential := &security.Credential{} - fakeTopicManager := pulsarmanager.NewTopicManager(producer.GetPartitionNum()) - sink, err := newMqSink(ctx, credential, fakeTopicManager, producer, filter, "", replicaConfig, encoderConfig, errCh) + fakeTopicManager := pulsarmanager.NewTopicManager( + producer.GetPartitionNum(), + ) + sink, err := newMqSink( + ctx, + credential, + fakeTopicManager, + producer, + filter, + "", + replicaConfig, + encoderConfig, + errCh, + ) if err != nil { return nil, errors.Trace(err) } diff --git a/cdc/sink/producer/kafka/kafka_test.go b/cdc/sink/producer/kafka/kafka_test.go index a5f8bdb66bd..8ace76310c6 100644 --- a/cdc/sink/producer/kafka/kafka_test.go +++ b/cdc/sink/producer/kafka/kafka_test.go @@ -108,7 +108,15 @@ func (s *kafkaSuite) TestNewSaramaProducer(c *check.C) { c.Assert(err, check.IsNil) adminClient, err := NewAdminClientImpl(config.BrokerEndpoints, saramaConfig) c.Assert(err, check.IsNil) - producer, err := NewKafkaSaramaProducer(ctx, client, adminClient, topic, config, saramaConfig, errCh) + producer, err := NewKafkaSaramaProducer( + ctx, + client, + adminClient, + topic, + config, + saramaConfig, + errCh, + ) c.Assert(err, check.IsNil) c.Assert(producer.GetPartitionNum(), check.Equals, int32(2)) @@ -377,7 +385,15 @@ func (s *kafkaSuite) TestProducerSendMessageFailed(c *check.C) { c.Assert(err, check.IsNil) adminClient, err := NewAdminClientImpl(config.BrokerEndpoints, saramaConfig) c.Assert(err, check.IsNil) - producer, err := NewKafkaSaramaProducer(ctx, client, adminClient, topic, config, saramaConfig, errCh) + producer, err := NewKafkaSaramaProducer( + ctx, + client, + adminClient, + topic, + config, + saramaConfig, + errCh, + ) defer func() { err := producer.Close() c.Assert(err, check.IsNil) @@ -451,7 +467,15 @@ func (s *kafkaSuite) TestProducerDoubleClose(c *check.C) { c.Assert(err, check.IsNil) adminClient, err := NewAdminClientImpl(config.BrokerEndpoints, saramaConfig) c.Assert(err, check.IsNil) - producer, err := NewKafkaSaramaProducer(ctx, client, adminClient, topic, config, saramaConfig, errCh) + producer, err := NewKafkaSaramaProducer( + ctx, + client, + adminClient, + topic, + config, + saramaConfig, + errCh, + ) defer func() { err := producer.Close() c.Assert(err, check.IsNil) diff --git a/pkg/kafka/client.go b/pkg/kafka/client.go index e52164d99b9..b6b8bf24f89 100644 --- a/pkg/kafka/client.go +++ b/pkg/kafka/client.go @@ -15,8 +15,10 @@ package kafka // Client is a generic Kafka client. type Client interface { - // Topics returns the set of available topics as retrieved from cluster metadata. + // Topics returns the set of available + // topics as retrieved from cluster metadata. Topics() ([]string, error) - // Partitions returns the sorted list of all partition IDs for the given topic. + // Partitions returns the sorted list of + // all partition IDs for the given topic. Partitions(topic string) ([]int32, error) }