Skip to content

Commit

Permalink
sink(ticdc): add topic manager (#4810)
Browse files Browse the repository at this point in the history
ref #4423
  • Loading branch information
Rustin170506 committed Mar 12, 2022
1 parent 1e8f99f commit 7b955b5
Show file tree
Hide file tree
Showing 12 changed files with 782 additions and 134 deletions.
22 changes: 16 additions & 6 deletions cdc/sink/dispatcher/event_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,23 @@ func NewEventRouter(cfg *config.ReplicaConfig, defaultTopic string) (*EventRoute
}, nil
}

// DispatchRowChangedEvent returns the target topic and partition.
func (s *EventRouter) DispatchRowChangedEvent(row *model.RowChangedEvent, partitionNum int32) (string, int32) {
topicDispatcher, partitionDispatcher := s.matchDispatcher(row.Table.Schema, row.Table.Table)
topic := topicDispatcher.DispatchRowChangedEvent(row)
partition := partitionDispatcher.DispatchRowChangedEvent(row, partitionNum)
// GetTopic returns the target topic.
func (s *EventRouter) GetTopic(row *model.RowChangedEvent) string {
topicDispatcher, _ := s.matchDispatcher(row.Table.Schema, row.Table.Table)
return topicDispatcher.DispatchRowChangedEvent(row)
}

return topic, partition
// GetPartition returns the target partition.
func (s *EventRouter) GetPartition(
row *model.RowChangedEvent,
partitionNum int32,
) int32 {
_, partitionDispatcher := s.matchDispatcher(row.Table.Schema,
row.Table.Table)
partition := partitionDispatcher.DispatchRowChangedEvent(row,
partitionNum)

return partition
}

// GetActiveTopics returns a list of the corresponding topics
Expand Down
220 changes: 208 additions & 12 deletions cdc/sink/dispatcher/event_router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,34 @@ func TestEventRouter(t *testing.T) {
d, err = NewEventRouter(&config.ReplicaConfig{
Sink: &config.SinkConfig{
DispatchRules: []*config.DispatchRule{
{Matcher: []string{"test_default1.*"}, PartitionRule: "default"},
{Matcher: []string{"test_default2.*"}, PartitionRule: "unknown-dispatcher"},
{Matcher: []string{"test_table.*"}, PartitionRule: "table", TopicRule: "hello_{schema}_world"},
{Matcher: []string{"test_index_value.*"}, PartitionRule: "index-value", TopicRule: "{schema}_world"},
{Matcher: []string{"test.*"}, PartitionRule: "rowid", TopicRule: "hello_{schema}"},
{Matcher: []string{"*.*", "!*.test"}, PartitionRule: "ts", TopicRule: "{schema}_{table}"},
{
Matcher: []string{"test_default1.*"},
PartitionRule: "default",
},
{
Matcher: []string{"test_default2.*"},
PartitionRule: "unknown-dispatcher",
},
{
Matcher: []string{"test_table.*"},
PartitionRule: "table",
TopicRule: "hello_{schema}_world",
},
{
Matcher: []string{"test_index_value.*"},
PartitionRule: "index-value",
TopicRule: "{schema}_world",
},
{
Matcher: []string{"test.*"},
PartitionRule: "rowid",
TopicRule: "hello_{schema}",
},
{
Matcher: []string{"*.*", "!*.test"},
PartitionRule: "ts",
TopicRule: "{schema}_{table}",
},
},
},
}, "")
Expand Down Expand Up @@ -80,12 +102,34 @@ func TestGetActiveTopics(t *testing.T) {
d, err := NewEventRouter(&config.ReplicaConfig{
Sink: &config.SinkConfig{
DispatchRules: []*config.DispatchRule{
{Matcher: []string{"test_default1.*"}, PartitionRule: "default"},
{Matcher: []string{"test_default2.*"}, PartitionRule: "unknown-dispatcher"},
{Matcher: []string{"test_table.*"}, PartitionRule: "table", TopicRule: "hello_{schema}_world"},
{Matcher: []string{"test_index_value.*"}, PartitionRule: "index-value", TopicRule: "{schema}_world"},
{Matcher: []string{"test.*"}, PartitionRule: "rowid", TopicRule: "hello_{schema}"},
{Matcher: []string{"*.*", "!*.test"}, PartitionRule: "ts", TopicRule: "{schema}_{table}"},
{
Matcher: []string{"test_default1.*"},
PartitionRule: "default",
},
{
Matcher: []string{"test_default2.*"},
PartitionRule: "unknown-dispatcher",
},
{
Matcher: []string{"test_table.*"},
PartitionRule: "table",
TopicRule: "hello_{schema}_world",
},
{
Matcher: []string{"test_index_value.*"},
PartitionRule: "index-value",
TopicRule: "{schema}_world",
},
{
Matcher: []string{"test.*"},
PartitionRule: "rowid",
TopicRule: "hello_{schema}",
},
{
Matcher: []string{"*.*", "!*.test"},
PartitionRule: "ts",
TopicRule: "{schema}_{table}",
},
},
},
}, "test")
Expand All @@ -101,3 +145,155 @@ func TestGetActiveTopics(t *testing.T) {
topics := d.GetActiveTopics(names)
require.Equal(t, []string{"test", "hello_test_table_world", "test_index_value_world", "hello_test", "sbs_table"}, topics)
}

func TestGetTopic(t *testing.T) {
t.Parallel()

d, err := NewEventRouter(&config.ReplicaConfig{
Sink: &config.SinkConfig{
DispatchRules: []*config.DispatchRule{
{
Matcher: []string{"test_default1.*"},
PartitionRule: "default",
},
{
Matcher: []string{"test_default2.*"},
PartitionRule: "unknown-dispatcher",
},
{
Matcher: []string{"test_table.*"},
PartitionRule: "table",
TopicRule: "hello_{schema}_world",
},
{
Matcher: []string{"test_index_value.*"},
PartitionRule: "index-value",
TopicRule: "{schema}_world",
},
{
Matcher: []string{"test.*"},
PartitionRule: "rowid",
TopicRule: "hello_{schema}",
},
{
Matcher: []string{"*.*", "!*.test"},
PartitionRule: "ts",
TopicRule: "{schema}_{table}",
},
},
},
}, "test")
require.Nil(t, err)

topicName := d.GetTopic(&model.RowChangedEvent{
Table: &model.TableName{Schema: "test_default1", Table: "table"},
})
require.Equal(t, "test", topicName)
topicName = d.GetTopic(&model.RowChangedEvent{
Table: &model.TableName{Schema: "test_default2", Table: "table"},
})
require.Equal(t, "test", topicName)
topicName = d.GetTopic(&model.RowChangedEvent{
Table: &model.TableName{Schema: "test_table", Table: "table"},
})
require.Equal(t, "hello_test_table_world", topicName)
topicName = d.GetTopic(&model.RowChangedEvent{
Table: &model.TableName{Schema: "test_index_value", Table: "table"},
})
require.Equal(t, "test_index_value_world", topicName)
topicName = d.GetTopic(&model.RowChangedEvent{
Table: &model.TableName{Schema: "a", Table: "table"},
})
require.Equal(t, "a_table", topicName)
}

func TestGetPartition(t *testing.T) {
t.Parallel()

d, err := NewEventRouter(&config.ReplicaConfig{
Sink: &config.SinkConfig{
DispatchRules: []*config.DispatchRule{
{
Matcher: []string{"test_default1.*"},
PartitionRule: "default",
},
{
Matcher: []string{"test_default2.*"},
PartitionRule: "unknown-dispatcher",
},
{
Matcher: []string{"test_table.*"},
PartitionRule: "table",
TopicRule: "hello_{schema}_world",
},
{
Matcher: []string{"test_index_value.*"},
PartitionRule: "index-value",
TopicRule: "{schema}_world",
},
{
Matcher: []string{"test.*"},
PartitionRule: "rowid",
TopicRule: "hello_{schema}",
},
{
Matcher: []string{"*.*", "!*.test"},
PartitionRule: "ts",
TopicRule: "{schema}_{table}",
},
},
},
}, "test")
require.Nil(t, err)

p := d.GetPartition(&model.RowChangedEvent{
Table: &model.TableName{Schema: "test_default1", Table: "table"},
Columns: []*model.Column{
{
Name: "id",
Value: 1,
Flag: model.HandleKeyFlag | model.PrimaryKeyFlag,
},
},
IndexColumns: [][]int{{0}},
}, 16)
require.Equal(t, int32(10), p)
p = d.GetPartition(&model.RowChangedEvent{
Table: &model.TableName{Schema: "test_default2", Table: "table"},
Columns: []*model.Column{
{
Name: "id",
Value: 1,
Flag: model.HandleKeyFlag | model.PrimaryKeyFlag,
},
},
IndexColumns: [][]int{{0}},
}, 16)
require.Equal(t, int32(4), p)

p = d.GetPartition(&model.RowChangedEvent{
Table: &model.TableName{Schema: "test_table", Table: "table"},
CommitTs: 1,
}, 16)
require.Equal(t, int32(15), p)
p = d.GetPartition(&model.RowChangedEvent{
Table: &model.TableName{Schema: "test_index_value", Table: "table"},
Columns: []*model.Column{
{
Name: "a",
Value: 11,
Flag: model.HandleKeyFlag,
}, {
Name: "b",
Value: 22,
Flag: 0,
},
},
}, 10)
require.Equal(t, int32(1), p)
p = d.GetPartition(&model.RowChangedEvent{
Table: &model.TableName{Schema: "a", Table: "table"},
CommitTs: 1,
}, 2)
require.Equal(t, int32(1), p)
}
Loading

0 comments on commit 7b955b5

Please sign in to comment.