Skip to content

Commit

Permalink
sink(ticdc): pass client and admin client
Browse files Browse the repository at this point in the history
  • Loading branch information
Rustin170506 committed Mar 9, 2022
1 parent 301ff7d commit 2010ada
Show file tree
Hide file tree
Showing 10 changed files with 231 additions and 56 deletions.
11 changes: 8 additions & 3 deletions cdc/sink/dispatcher/event_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,14 @@ func (s *EventRouter) GetTopic(row *model.RowChangedEvent) string {
}

// GetPartition returns the target partition.
func (s *EventRouter) GetPartition(row *model.RowChangedEvent, partitionNum int32) int32 {
_, partitionDispatcher := s.matchDispatcher(row.Table.Schema, row.Table.Table)
partition := partitionDispatcher.DispatchRowChangedEvent(row, partitionNum)
func (s *EventRouter) GetPartition(
row *model.RowChangedEvent,
partitionNum int32,
) int32 {
_, partitionDispatcher := s.matchDispatcher(row.Table.Schema,
row.Table.Table)
partition := partitionDispatcher.DispatchRowChangedEvent(row,
partitionNum)

return partition
}
Expand Down
136 changes: 112 additions & 24 deletions cdc/sink/dispatcher/event_router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,34 @@ func TestEventRouter(t *testing.T) {
d, err = NewEventRouter(&config.ReplicaConfig{
Sink: &config.SinkConfig{
DispatchRules: []*config.DispatchRule{
{Matcher: []string{"test_default1.*"}, PartitionRule: "default"},
{Matcher: []string{"test_default2.*"}, PartitionRule: "unknown-dispatcher"},
{Matcher: []string{"test_table.*"}, PartitionRule: "table", TopicRule: "hello_{schema}_world"},
{Matcher: []string{"test_index_value.*"}, PartitionRule: "index-value", TopicRule: "{schema}_world"},
{Matcher: []string{"test.*"}, PartitionRule: "rowid", TopicRule: "hello_{schema}"},
{Matcher: []string{"*.*", "!*.test"}, PartitionRule: "ts", TopicRule: "{schema}_{table}"},
{
Matcher: []string{"test_default1.*"},
PartitionRule: "default",
},
{
Matcher: []string{"test_default2.*"},
PartitionRule: "unknown-dispatcher",
},
{
Matcher: []string{"test_table.*"},
PartitionRule: "table",
TopicRule: "hello_{schema}_world",
},
{
Matcher: []string{"test_index_value.*"},
PartitionRule: "index-value",
TopicRule: "{schema}_world",
},
{
Matcher: []string{"test.*"},
PartitionRule: "rowid",
TopicRule: "hello_{schema}",
},
{
Matcher: []string{"*.*", "!*.test"},
PartitionRule: "ts",
TopicRule: "{schema}_{table}",
},
},
},
}, "")
Expand Down Expand Up @@ -80,12 +102,34 @@ func TestGetActiveTopics(t *testing.T) {
d, err := NewEventRouter(&config.ReplicaConfig{
Sink: &config.SinkConfig{
DispatchRules: []*config.DispatchRule{
{Matcher: []string{"test_default1.*"}, PartitionRule: "default"},
{Matcher: []string{"test_default2.*"}, PartitionRule: "unknown-dispatcher"},
{Matcher: []string{"test_table.*"}, PartitionRule: "table", TopicRule: "hello_{schema}_world"},
{Matcher: []string{"test_index_value.*"}, PartitionRule: "index-value", TopicRule: "{schema}_world"},
{Matcher: []string{"test.*"}, PartitionRule: "rowid", TopicRule: "hello_{schema}"},
{Matcher: []string{"*.*", "!*.test"}, PartitionRule: "ts", TopicRule: "{schema}_{table}"},
{
Matcher: []string{"test_default1.*"},
PartitionRule: "default",
},
{
Matcher: []string{"test_default2.*"},
PartitionRule: "unknown-dispatcher",
},
{
Matcher: []string{"test_table.*"},
PartitionRule: "table",
TopicRule: "hello_{schema}_world",
},
{
Matcher: []string{"test_index_value.*"},
PartitionRule: "index-value",
TopicRule: "{schema}_world",
},
{
Matcher: []string{"test.*"},
PartitionRule: "rowid",
TopicRule: "hello_{schema}",
},
{
Matcher: []string{"*.*", "!*.test"},
PartitionRule: "ts",
TopicRule: "{schema}_{table}",
},
},
},
}, "test")
Expand All @@ -108,12 +152,34 @@ func TestGetTopic(t *testing.T) {
d, err := NewEventRouter(&config.ReplicaConfig{
Sink: &config.SinkConfig{
DispatchRules: []*config.DispatchRule{
{Matcher: []string{"test_default1.*"}, PartitionRule: "default"},
{Matcher: []string{"test_default2.*"}, PartitionRule: "unknown-dispatcher"},
{Matcher: []string{"test_table.*"}, PartitionRule: "table", TopicRule: "hello_{schema}_world"},
{Matcher: []string{"test_index_value.*"}, PartitionRule: "index-value", TopicRule: "{schema}_world"},
{Matcher: []string{"test.*"}, PartitionRule: "rowid", TopicRule: "hello_{schema}"},
{Matcher: []string{"*.*", "!*.test"}, PartitionRule: "ts", TopicRule: "{schema}_{table}"},
{
Matcher: []string{"test_default1.*"},
PartitionRule: "default",
},
{
Matcher: []string{"test_default2.*"},
PartitionRule: "unknown-dispatcher",
},
{
Matcher: []string{"test_table.*"},
PartitionRule: "table",
TopicRule: "hello_{schema}_world",
},
{
Matcher: []string{"test_index_value.*"},
PartitionRule: "index-value",
TopicRule: "{schema}_world",
},
{
Matcher: []string{"test.*"},
PartitionRule: "rowid",
TopicRule: "hello_{schema}",
},
{
Matcher: []string{"*.*", "!*.test"},
PartitionRule: "ts",
TopicRule: "{schema}_{table}",
},
},
},
}, "test")
Expand Down Expand Up @@ -147,12 +213,34 @@ func TestGetPartition(t *testing.T) {
d, err := NewEventRouter(&config.ReplicaConfig{
Sink: &config.SinkConfig{
DispatchRules: []*config.DispatchRule{
{Matcher: []string{"test_default1.*"}, PartitionRule: "default"},
{Matcher: []string{"test_default2.*"}, PartitionRule: "unknown-dispatcher"},
{Matcher: []string{"test_table.*"}, PartitionRule: "table", TopicRule: "hello_{schema}_world"},
{Matcher: []string{"test_index_value.*"}, PartitionRule: "index-value", TopicRule: "{schema}_world"},
{Matcher: []string{"test.*"}, PartitionRule: "rowid", TopicRule: "hello_{schema}"},
{Matcher: []string{"*.*", "!*.test"}, PartitionRule: "ts", TopicRule: "{schema}_{table}"},
{
Matcher: []string{"test_default1.*"},
PartitionRule: "default",
},
{
Matcher: []string{"test_default2.*"},
PartitionRule: "unknown-dispatcher",
},
{
Matcher: []string{"test_table.*"},
PartitionRule: "table",
TopicRule: "hello_{schema}_world",
},
{
Matcher: []string{"test_index_value.*"},
PartitionRule: "index-value",
TopicRule: "{schema}_world",
},
{
Matcher: []string{"test.*"},
PartitionRule: "rowid",
TopicRule: "hello_{schema}",
},
{
Matcher: []string{"*.*", "!*.test"},
PartitionRule: "ts",
TopicRule: "{schema}_{table}",
},
},
},
}, "test")
Expand Down
9 changes: 7 additions & 2 deletions cdc/sink/manager/kafka/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,11 @@ type TopicManager struct {
}

// NewTopicManager creates a new topic manager.
func NewTopicManager(client kafka.Client, admin kafka.ClusterAdminClient, cfg *kafkaconfig.AutoCreateTopicConfig) *TopicManager {
func NewTopicManager(
client kafka.Client,
admin kafka.ClusterAdminClient,
cfg *kafkaconfig.AutoCreateTopicConfig,
) *TopicManager {
return &TopicManager{
client: client,
admin: admin,
Expand Down Expand Up @@ -106,7 +110,8 @@ func (m *TopicManager) CreateTopic(topicName string) error {

if !m.cfg.AutoCreate {
return cerror.ErrKafkaInvalidConfig.GenWithStack(
fmt.Sprintf("`auto-create-topic` is false, and %s not found", topicName))
fmt.Sprintf("`auto-create-topic` is false, "+
"and %s not found", topicName))
}

err = m.admin.CreateTopic(topicName, &sarama.TopicDetail{
Expand Down
6 changes: 5 additions & 1 deletion cdc/sink/manager/kafka/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,5 +104,9 @@ func TestCreateTopic(t *testing.T) {
cfg.AutoCreate = false
manager = NewTopicManager(client, adminClient, cfg)
err = manager.CreateTopic("new-topic2")
require.Regexp(t, "`auto-create-topic` is false, and new-topic2 not found", err)
require.Regexp(
t,
"`auto-create-topic` is false, and new-topic2 not found",
err,
)
}
3 changes: 2 additions & 1 deletion cdc/sink/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
package manager

// TopicManager is the interface of topic manager.
// It will be responsible for creating and updating the information of the topic.
// It will be responsible for creating and
// updating the information of the topic.
type TopicManager interface {
// Partitions returns the partitions of the topic.
Partitions(topic string) (int32, error)
Expand Down
6 changes: 4 additions & 2 deletions cdc/sink/manager/pulsar/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@

package pulsar

// TopicManager is the interface that wraps the basic Pulsar topic management operations.
// Right now it doesn't have any implementation, Pulsar doesn't support multiple topics yet.
// TopicManager is the interface
// that wraps the basic Pulsar topic management operations.
// Right now it doesn't have any implementation,
// Pulsar doesn't support multiple topics yet.
// So it now just returns a fixed number of partitions for a fixed topic.
type TopicManager struct {
partitionNum int32
Expand Down
44 changes: 39 additions & 5 deletions cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,17 +364,39 @@ func newKafkaSaramaSink(ctx context.Context, sinkURI *url.URL,
return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}

topicManager := kafkamanager.NewTopicManager(client, adminClient, baseConfig.DeriveTopicConfig())
topicManager := kafkamanager.NewTopicManager(
client,
adminClient,
baseConfig.DeriveTopicConfig(),
)
if err := topicManager.CreateTopic(topic); err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaCreateTopic, err)
}

sProducer, err := kafka.NewKafkaSaramaProducer(ctx, topic, baseConfig, saramaConfig, errCh)
sProducer, err := kafka.NewKafkaSaramaProducer(
ctx,
client,
adminClient,
topic,
baseConfig,
saramaConfig,
errCh,
)
if err != nil {
return nil, errors.Trace(err)
}

sink, err := newMqSink(ctx, baseConfig.Credential, topicManager, sProducer, filter, topic, replicaConfig, encoderConfig, errCh)
sink, err := newMqSink(
ctx,
baseConfig.Credential,
topicManager,
sProducer,
filter,
topic,
replicaConfig,
encoderConfig,
errCh,
)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -414,8 +436,20 @@ func newPulsarSink(ctx context.Context, sinkURI *url.URL, filter *filter.Filter,
// For now, it's a placeholder. Avro format have to make connection to Schema Registry,
// and it may need credential.
credential := &security.Credential{}
fakeTopicManager := pulsarmanager.NewTopicManager(producer.GetPartitionNum())
sink, err := newMqSink(ctx, credential, fakeTopicManager, producer, filter, "", replicaConfig, encoderConfig, errCh)
fakeTopicManager := pulsarmanager.NewTopicManager(
producer.GetPartitionNum(),
)
sink, err := newMqSink(
ctx,
credential,
fakeTopicManager,
producer,
filter,
"",
replicaConfig,
encoderConfig,
errCh,
)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
24 changes: 11 additions & 13 deletions cdc/sink/producer/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ type kafkaSaramaProducer struct {
// clientLock is used to protect concurrent access of asyncProducer and syncProducer.
// Since we don't close these two clients (which have an input chan) from the
// sender routine, data race or send on closed chan could happen.
clientLock sync.RWMutex
clientLock sync.RWMutex
// This admin mainly used by `metricsMonitor` to fetch broker info.
admin kafka.ClusterAdminClient
client sarama.Client
asyncProducer sarama.AsyncProducer
Expand Down Expand Up @@ -317,23 +318,20 @@ func (k *kafkaSaramaProducer) run(ctx context.Context) error {
var NewAdminClientImpl kafka.ClusterAdminClientCreator = kafka.NewSaramaAdminClient

// NewKafkaSaramaProducer creates a kafka sarama producer
func NewKafkaSaramaProducer(ctx context.Context, topic string, config *Config, saramaConfig *sarama.Config, errCh chan error) (*kafkaSaramaProducer, error) {
func NewKafkaSaramaProducer(
ctx context.Context,
client sarama.Client,
admin kafka.ClusterAdminClient,
topic string,
config *Config,
saramaConfig *sarama.Config,
errCh chan error,
) (*kafkaSaramaProducer, error) {
changefeedID := util.ChangefeedIDFromCtx(ctx)
role := util.RoleFromCtx(ctx)
log.Info("Starting kafka sarama producer ...", zap.Any("config", config),
zap.String("changefeed", changefeedID), zap.Any("role", role))

// this admin mainly used by `metricsMonitor` to fetch broker info.
admin, err := NewAdminClientImpl(config.BrokerEndpoints, saramaConfig)
if err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}

client, err := sarama.NewClient(config.BrokerEndpoints, saramaConfig)
if err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}

asyncProducer, err := sarama.NewAsyncProducerFromClient(client)
if err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
Expand Down
Loading

0 comments on commit 2010ada

Please sign in to comment.