Skip to content

Commit

Permalink
sink(ticdc): fix lint
Browse files Browse the repository at this point in the history
  • Loading branch information
Rustin170506 committed Mar 9, 2022
1 parent 308ef69 commit 37a2a2e
Show file tree
Hide file tree
Showing 9 changed files with 175 additions and 36 deletions.
11 changes: 8 additions & 3 deletions cdc/sink/dispatcher/event_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,14 @@ func (s *EventRouter) GetTopic(row *model.RowChangedEvent) string {
}

// GetPartition returns the target partition.
func (s *EventRouter) GetPartition(row *model.RowChangedEvent, partitionNum int32) int32 {
_, partitionDispatcher := s.matchDispatcher(row.Table.Schema, row.Table.Table)
partition := partitionDispatcher.DispatchRowChangedEvent(row, partitionNum)
func (s *EventRouter) GetPartition(
row *model.RowChangedEvent,
partitionNum int32,
) int32 {
_, partitionDispatcher := s.matchDispatcher(row.Table.Schema,
row.Table.Table)
partition := partitionDispatcher.DispatchRowChangedEvent(row,
partitionNum)

return partition
}
Expand Down
102 changes: 84 additions & 18 deletions cdc/sink/dispatcher/event_router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,34 @@ func TestGetActiveTopics(t *testing.T) {
d, err := NewEventRouter(&config.ReplicaConfig{
Sink: &config.SinkConfig{
DispatchRules: []*config.DispatchRule{
{Matcher: []string{"test_default1.*"}, PartitionRule: "default"},
{Matcher: []string{"test_default2.*"}, PartitionRule: "unknown-dispatcher"},
{Matcher: []string{"test_table.*"}, PartitionRule: "table", TopicRule: "hello_{schema}_world"},
{Matcher: []string{"test_index_value.*"}, PartitionRule: "index-value", TopicRule: "{schema}_world"},
{Matcher: []string{"test.*"}, PartitionRule: "rowid", TopicRule: "hello_{schema}"},
{Matcher: []string{"*.*", "!*.test"}, PartitionRule: "ts", TopicRule: "{schema}_{table}"},
{
Matcher: []string{"test_default1.*"},
PartitionRule: "default",
},
{
Matcher: []string{"test_default2.*"},
PartitionRule: "unknown-dispatcher",
},
{
Matcher: []string{"test_table.*"},
PartitionRule: "table",
TopicRule: "hello_{schema}_world",
},
{
Matcher: []string{"test_index_value.*"},
PartitionRule: "index-value",
TopicRule: "{schema}_world",
},
{
Matcher: []string{"test.*"},
PartitionRule: "rowid",
TopicRule: "hello_{schema}",
},
{
Matcher: []string{"*.*", "!*.test"},
PartitionRule: "ts",
TopicRule: "{schema}_{table}",
},
},
},
}, "test")
Expand All @@ -130,12 +152,34 @@ func TestGetTopic(t *testing.T) {
d, err := NewEventRouter(&config.ReplicaConfig{
Sink: &config.SinkConfig{
DispatchRules: []*config.DispatchRule{
{Matcher: []string{"test_default1.*"}, PartitionRule: "default"},
{Matcher: []string{"test_default2.*"}, PartitionRule: "unknown-dispatcher"},
{Matcher: []string{"test_table.*"}, PartitionRule: "table", TopicRule: "hello_{schema}_world"},
{Matcher: []string{"test_index_value.*"}, PartitionRule: "index-value", TopicRule: "{schema}_world"},
{Matcher: []string{"test.*"}, PartitionRule: "rowid", TopicRule: "hello_{schema}"},
{Matcher: []string{"*.*", "!*.test"}, PartitionRule: "ts", TopicRule: "{schema}_{table}"},
{
Matcher: []string{"test_default1.*"},
PartitionRule: "default",
},
{
Matcher: []string{"test_default2.*"},
PartitionRule: "unknown-dispatcher",
},
{
Matcher: []string{"test_table.*"},
PartitionRule: "table",
TopicRule: "hello_{schema}_world",
},
{
Matcher: []string{"test_index_value.*"},
PartitionRule: "index-value",
TopicRule: "{schema}_world",
},
{
Matcher: []string{"test.*"},
PartitionRule: "rowid",
TopicRule: "hello_{schema}",
},
{
Matcher: []string{"*.*", "!*.test"},
PartitionRule: "ts",
TopicRule: "{schema}_{table}",
},
},
},
}, "test")
Expand Down Expand Up @@ -169,12 +213,34 @@ func TestGetPartition(t *testing.T) {
d, err := NewEventRouter(&config.ReplicaConfig{
Sink: &config.SinkConfig{
DispatchRules: []*config.DispatchRule{
{Matcher: []string{"test_default1.*"}, PartitionRule: "default"},
{Matcher: []string{"test_default2.*"}, PartitionRule: "unknown-dispatcher"},
{Matcher: []string{"test_table.*"}, PartitionRule: "table", TopicRule: "hello_{schema}_world"},
{Matcher: []string{"test_index_value.*"}, PartitionRule: "index-value", TopicRule: "{schema}_world"},
{Matcher: []string{"test.*"}, PartitionRule: "rowid", TopicRule: "hello_{schema}"},
{Matcher: []string{"*.*", "!*.test"}, PartitionRule: "ts", TopicRule: "{schema}_{table}"},
{
Matcher: []string{"test_default1.*"},
PartitionRule: "default",
},
{
Matcher: []string{"test_default2.*"},
PartitionRule: "unknown-dispatcher",
},
{
Matcher: []string{"test_table.*"},
PartitionRule: "table",
TopicRule: "hello_{schema}_world",
},
{
Matcher: []string{"test_index_value.*"},
PartitionRule: "index-value",
TopicRule: "{schema}_world",
},
{
Matcher: []string{"test.*"},
PartitionRule: "rowid",
TopicRule: "hello_{schema}",
},
{
Matcher: []string{"*.*", "!*.test"},
PartitionRule: "ts",
TopicRule: "{schema}_{table}",
},
},
},
}, "test")
Expand Down
3 changes: 2 additions & 1 deletion cdc/sink/manager/kafka/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ func (m *TopicManager) CreateTopic(topicName string) error {

if !m.cfg.AutoCreate {
return cerror.ErrKafkaInvalidConfig.GenWithStack(
fmt.Sprintf("`auto-create-topic` is false, and %s not found", topicName))
fmt.Sprintf("`auto-create-topic` is false, "+
"and %s not found", topicName))
}

err = m.admin.CreateTopic(topicName, &sarama.TopicDetail{
Expand Down
6 changes: 5 additions & 1 deletion cdc/sink/manager/kafka/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,5 +104,9 @@ func TestCreateTopic(t *testing.T) {
cfg.AutoCreate = false
manager = NewTopicManager(client, adminClient, cfg)
err = manager.CreateTopic("new-topic2")
require.Regexp(t, "`auto-create-topic` is false, and new-topic2 not found", err)
require.Regexp(
t,
"`auto-create-topic` is false, and new-topic2 not found",
err,
)
}
3 changes: 2 additions & 1 deletion cdc/sink/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
package manager

// TopicManager is the interface of topic manager.
// It will be responsible for creating and updating the information of the topic.
// It will be responsible for creating and
// updating the information of the topic.
type TopicManager interface {
// Partitions returns the partitions of the topic.
Partitions(topic string) (int32, error)
Expand Down
6 changes: 4 additions & 2 deletions cdc/sink/manager/pulsar/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@

package pulsar

// TopicManager is the interface that wraps the basic Pulsar topic management operations.
// Right now it doesn't have any implementation, Pulsar doesn't support multiple topics yet.
// TopicManager is the interface
// that wraps the basic Pulsar topic management operations.
// Right now it doesn't have any implementation,
// Pulsar doesn't support multiple topics yet.
// So it now just returns a fixed number of partitions for a fixed topic.
type TopicManager struct {
partitionNum int32
Expand Down
44 changes: 39 additions & 5 deletions cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,17 +364,39 @@ func newKafkaSaramaSink(ctx context.Context, sinkURI *url.URL,
return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}

topicManager := kafkamanager.NewTopicManager(client, adminClient, baseConfig.DeriveTopicConfig())
topicManager := kafkamanager.NewTopicManager(
client,
adminClient,
baseConfig.DeriveTopicConfig(),
)
if err := topicManager.CreateTopic(topic); err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaCreateTopic, err)
}

sProducer, err := kafka.NewKafkaSaramaProducer(ctx, client, adminClient, topic, baseConfig, saramaConfig, errCh)
sProducer, err := kafka.NewKafkaSaramaProducer(
ctx,
client,
adminClient,
topic,
baseConfig,
saramaConfig,
errCh,
)
if err != nil {
return nil, errors.Trace(err)
}

sink, err := newMqSink(ctx, baseConfig.Credential, topicManager, sProducer, filter, topic, replicaConfig, encoderConfig, errCh)
sink, err := newMqSink(
ctx,
baseConfig.Credential,
topicManager,
sProducer,
filter,
topic,
replicaConfig,
encoderConfig,
errCh,
)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -414,8 +436,20 @@ func newPulsarSink(ctx context.Context, sinkURI *url.URL, filter *filter.Filter,
// For now, it's a placeholder. Avro format have to make connection to Schema Registry,
// and it may need credential.
credential := &security.Credential{}
fakeTopicManager := pulsarmanager.NewTopicManager(producer.GetPartitionNum())
sink, err := newMqSink(ctx, credential, fakeTopicManager, producer, filter, "", replicaConfig, encoderConfig, errCh)
fakeTopicManager := pulsarmanager.NewTopicManager(
producer.GetPartitionNum(),
)
sink, err := newMqSink(
ctx,
credential,
fakeTopicManager,
producer,
filter,
"",
replicaConfig,
encoderConfig,
errCh,
)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
30 changes: 27 additions & 3 deletions cdc/sink/producer/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,15 @@ func (s *kafkaSuite) TestNewSaramaProducer(c *check.C) {
c.Assert(err, check.IsNil)
adminClient, err := NewAdminClientImpl(config.BrokerEndpoints, saramaConfig)
c.Assert(err, check.IsNil)
producer, err := NewKafkaSaramaProducer(ctx, client, adminClient, topic, config, saramaConfig, errCh)
producer, err := NewKafkaSaramaProducer(
ctx,
client,
adminClient,
topic,
config,
saramaConfig,
errCh,
)
c.Assert(err, check.IsNil)
c.Assert(producer.GetPartitionNum(), check.Equals, int32(2))

Expand Down Expand Up @@ -377,7 +385,15 @@ func (s *kafkaSuite) TestProducerSendMessageFailed(c *check.C) {
c.Assert(err, check.IsNil)
adminClient, err := NewAdminClientImpl(config.BrokerEndpoints, saramaConfig)
c.Assert(err, check.IsNil)
producer, err := NewKafkaSaramaProducer(ctx, client, adminClient, topic, config, saramaConfig, errCh)
producer, err := NewKafkaSaramaProducer(
ctx,
client,
adminClient,
topic,
config,
saramaConfig,
errCh,
)
defer func() {
err := producer.Close()
c.Assert(err, check.IsNil)
Expand Down Expand Up @@ -451,7 +467,15 @@ func (s *kafkaSuite) TestProducerDoubleClose(c *check.C) {
c.Assert(err, check.IsNil)
adminClient, err := NewAdminClientImpl(config.BrokerEndpoints, saramaConfig)
c.Assert(err, check.IsNil)
producer, err := NewKafkaSaramaProducer(ctx, client, adminClient, topic, config, saramaConfig, errCh)
producer, err := NewKafkaSaramaProducer(
ctx,
client,
adminClient,
topic,
config,
saramaConfig,
errCh,
)
defer func() {
err := producer.Close()
c.Assert(err, check.IsNil)
Expand Down
6 changes: 4 additions & 2 deletions pkg/kafka/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@ package kafka

// Client is a generic Kafka client.
type Client interface {
// Topics returns the set of available topics as retrieved from cluster metadata.
// Topics returns the set of available
// topics as retrieved from cluster metadata.
Topics() ([]string, error)
// Partitions returns the sorted list of all partition IDs for the given topic.
// Partitions returns the sorted list of
// all partition IDs for the given topic.
Partitions(topic string) ([]int32, error)
}

0 comments on commit 37a2a2e

Please sign in to comment.