Skip to content

Commit

Permalink
sink(ticdc): Enhance the topic expression to allow more flexible disp…
Browse files Browse the repository at this point in the history
…atching (#5377)

ref #4423
  • Loading branch information
zhaoxinyu authored May 12, 2022
1 parent 6b07e48 commit fd48f9d
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 18 deletions.
18 changes: 9 additions & 9 deletions cdc/sink/mq/dispatcher/topic/expression.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@ package topic

import (
"regexp"
"strings"

"github.com/pingcap/tiflow/pkg/errors"
)

var (
// topicNameRE is used to match a valid topic expression
topicNameRE = regexp.MustCompile(`^[A-Za-z0-9\._\-]*\{schema\}[A-Za-z0-9\._\-]*$|^\{schema\}_\{table\}$`)
topicNameRE = regexp.MustCompile(
`^[A-Za-z0-9\._\-]*\{schema\}([A-Za-z0-9\._\-]*\{table\})?[A-Za-z0-9\._\-]*$`,
)
// kafkaForbidRE is used to reject the characters which are forbidden in kafka topic name
kafkaForbidRE = regexp.MustCompile(`[^a-zA-Z0-9\._\-]`)
// schemaRE is used to match substring '{schema}' in topic expression
Expand All @@ -36,9 +37,9 @@ var (
const kafkaTopicNameMaxLength = 249

// Expression represent a kafka topic expression.
// Only two types of expression are allowed:
// 1. [prefix]{schema}[suffix], the prefix/suffix is optional and matches [A-Za-z0-9\._\-]*
// 2. {schema}_{table}
// The expression should be in form of: [prefix]{schema}[middle][{table}][suffix]
// prefix/suffix/middle are optional and should match the regex of [A-Za-z0-9\._\-]*
// {table} can also be optional.
type Expression string

// Validate checks whether a kafka topic name is valid or not.
Expand All @@ -55,10 +56,9 @@ func (e Expression) Validate() error {
// When doing conversion, the special characters other than [A-Za-z0-9\._\-] in schema/table
// will be substituted for underscore '_'.
func (e Expression) Substitute(schema, table string) string {
// the upper case letters in schema/table will be converted to lower case,
// and some of the special characters will be replaced with '_'
replacedSchema := kafkaForbidRE.ReplaceAllString(strings.ToLower(schema), "_")
replacedTable := kafkaForbidRE.ReplaceAllString(strings.ToLower(table), "_")
// some of the special characters will be replaced with '_'
replacedSchema := kafkaForbidRE.ReplaceAllString(schema, "_")
replacedTable := kafkaForbidRE.ReplaceAllString(table, "_")

topicExpr := string(e)
// doing the real conversion things
Expand Down
48 changes: 39 additions & 9 deletions cdc/sink/mq/dispatcher/topic/expression_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,14 @@ func TestSubstituteTopicExpression(t *testing.T) {
name: "valid expression containing '{schema}', schema is converted to lower case letters",
schema: "DEF",
table: "",
expected: "abcdef",
expected: "abcDEF",
},
{
name: "valid expression containing '{schema}' and special prefix, schema is converted to lower case letters",
expression: "abc._-def{schema}abc",
schema: "HELLO",
table: "",
expected: "abc._-defhelloabc",
expected: "abc._-defHELLOabc",
},
{
name: "valid expression containing '{schema}', the kafka disallowed characters in schema are replaced with underscore '_'",
Expand Down Expand Up @@ -126,48 +126,78 @@ func TestSubstituteTopicExpression(t *testing.T) {
expected: "",
},
{
name: "valid expression containing '{schema}_{table}'",
name: "valid expression containing '{schema}' and '{table}'," +
" without prefix or suffix",
expression: "{schema}_{table}",
schema: "hello",
table: "world",
expected: "hello_world",
},
{
name: "valid expression containing '{schema}_{table}', schema/table are converted to lower case letters",
name: "valid expression containing '{schema}' and '{table}'," +
" schema/table are converted to lower case letters",
expression: "{schema}_{table}",
schema: "HELLO",
table: "WORLD",
expected: "hello_world",
expected: "HELLO_WORLD",
},
{
name: "valid expression containing '{schema}_{table}', the kafka disallowed characters in table are replaced",
name: "valid expression containing '{schema}' and '{table}'," +
" the kafka disallowed characters in table are replaced",
expression: "{schema}_{table}",
schema: "hello",
table: "!@#$%^&*",
expected: "hello_________",
},
{
name: "valid expression containing '{schema}_{table}', the kafka disallowed characters in schema are replaced",
name: "valid expression containing '{schema}' and '{table}'," +
" the kafka disallowed characters in schema are replaced",
expression: "{schema}_{table}",
schema: "()_+.{}",
table: "world",
expected: "____.___world",
},
{
name: "valid expression containing '{schema}_{table}', the kafka disallowed characters in schema and table are replaced",
name: "valid expression containing '{schema}' and '{table}'," +
" with both prefix and suffix",
expression: "ab.-c_{schema}_{table}_de.-f",
schema: "hello",
table: "WORLD",
expected: "ab.-c_hello_WORLD_de.-f",
},
{
name: "valid expression containing '{schema}' and '{table}'," +
" with customized middle delimited string",
expression: "abc_{schema}de._-{table}_fhi",
schema: "hello",
table: "world",
expected: "abc_hellode._-world_fhi",
},
{
name: "valid expression containing '{schema}' and '{table}'," +
" the kafka disallowed characters in schema and table are replaced",
expression: "{schema}_{table}",
schema: "你好",
table: "世界",
expected: "_____",
},
{
name: "invalid expression not containing '{schema}_{table}'",
name: "invalid expression not containing '{schema}' and '{table}'",
expression: "{sch}_{tab}",
schema: "hello",
table: "world",
wantErr: "invalid topic expression",
expected: "",
},
{
name: "invalid expression containing '{schema}' and '{table}'," +
" the middle delimited string must be [A-Za-z0-9\\._\\-]*",
expression: "{schema}!@#$%^&*()+{}你好{table}",
schema: "hello",
table: "world",
wantErr: "invalid topic expression",
expected: "",
},
{
name: "invalid topic name '.'",
expression: "{schema}",
Expand Down

0 comments on commit fd48f9d

Please sign in to comment.