diff --git a/cdc/sink/codec/avro.go b/cdc/sink/codec/avro.go index f69feb8e522..2e491a71f63 100644 --- a/cdc/sink/codec/avro.go +++ b/cdc/sink/codec/avro.go @@ -18,6 +18,7 @@ import ( "context" "encoding/binary" "encoding/json" + "fmt" "math/big" "strconv" "strings" @@ -181,11 +182,11 @@ func (a *AvroEventBatchEncoder) avroEncode( } } - var fqdn string = e.Table.Schema + "." + e.Table.Table + qualifiedName := getQualifiedNameFromTableName(e.Table) schemaGen := func() (string, error) { schema, err := rowToAvroSchema( - fqdn, + qualifiedName, cols, colInfos, enableTiDBExtension, @@ -200,7 +201,7 @@ func (a *AvroEventBatchEncoder) avroEncode( avroCodec, registryID, err := schemaManager.GetCachedOrRegister( ctx, - fqdn, + qualifiedName, e.TableInfoVersion, schemaGen, ) @@ -289,6 +290,59 @@ func getTiDBTypeFromColumn(col *model.Column) string { return tt } +const ( + replacementChar = "_" + numberPrefix = "_" +) + +// debezium-core/src/main/java/io/debezium/schema/FieldNameSelector.java +// https://avro.apache.org/docs/current/spec.html#names +func sanitizeColumnName(name string) string { + changed := false + var sb strings.Builder + for i, c := range name { + if i == 0 && (c >= '0' && c <= '9') { + sb.WriteString(numberPrefix) + sb.WriteRune(c) + changed = true + } else if !(c == '_' || + ('a' <= c && c <= 'z') || + ('A' <= c && c <= 'Z') || + ('0' <= c && c <= '9')) { + sb.WriteString(replacementChar) + changed = true + } else { + sb.WriteRune(c) + } + } + + sanitizedName := sb.String() + if changed { + log.Warn( + fmt.Sprintf( + "Field '%s' name potentially not safe for serialization, replaced with '%s'", + name, + sanitizedName, + ), + ) + } + return sanitizedName +} + +// https://github.com/debezium/debezium/blob/9f7ede0e0695f012c6c4e715e96aed85eecf6b5f \ +// /debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/antlr/ \ +// MySqlAntlrDdlParser.java#L374 +func escapeEnumAndSetOptions(option string) string { + option = strings.ReplaceAll(option, ",", "\\,") + option = strings.ReplaceAll(option, "\\'", "'") + option = strings.ReplaceAll(option, "''", "'") + return option +} + +func getQualifiedNameFromTableName(tableName *model.TableName) string { + return tableName.Schema + "." + tableName.Table +} + type avroSchema struct { Type string `json:"type"` Parameters map[string]string `json:"connect.parameters"` @@ -302,7 +356,7 @@ type avroLogicalTypeSchema struct { } func rowToAvroSchema( - fqdn string, + qualifiedName string, columnInfo []*model.Column, colInfos []rowcodec.ColInfo, enableTiDBExtension bool, @@ -311,7 +365,7 @@ func rowToAvroSchema( ) (string, error) { top := avroSchemaTop{ Tp: "record", - Name: fqdn, + Name: qualifiedName, Fields: nil, } @@ -326,7 +380,7 @@ func rowToAvroSchema( return "", err } field := make(map[string]interface{}) - field["name"] = col.Name + field["name"] = sanitizeColumnName(col.Name) if col.Flag.IsNullable() { field["type"] = []interface{}{"null", avroType} field["default"] = nil @@ -388,9 +442,9 @@ func rowToAvroData( // https://pkg.go.dev/github.com/linkedin/goavro/v2#Union if col.Flag.IsNullable() { - ret[col.Name] = goavro.Union(str, data) + ret[sanitizeColumnName(col.Name)] = goavro.Union(str, data) } else { - ret[col.Name] = data + ret[sanitizeColumnName(col.Name)] = data } } @@ -503,7 +557,7 @@ func columnToAvroSchema( case mysql.TypeEnum, mysql.TypeSet: es := make([]string, 0, len(ft.Elems)) for _, e := range ft.Elems { - e = strings.ReplaceAll(e, ",", "\\,") + e = escapeEnumAndSetOptions(e) es = append(es, e) } return avroSchema{ diff --git a/cdc/sink/codec/avro_test.go b/cdc/sink/codec/avro_test.go index 8298c1ff38c..3b6188afd9a 100644 --- a/cdc/sink/codec/avro_test.go +++ b/cdc/sink/codec/avro_test.go @@ -633,7 +633,7 @@ func TestRowToAvroSchema(t *testing.T) { Schema: "testdb", Table: "rowtoavroschema", } - fqdn := table.Schema + "." + table.Table + qualifiedName := getQualifiedNameFromTableName(&table) var cols []*model.Column = make([]*model.Column, 0) var colInfos []rowcodec.ColInfo = make([]rowcodec.ColInfo, 0) @@ -648,13 +648,13 @@ func TestRowToAvroSchema(t *testing.T) { colInfos = append(colInfos, v.colInfo) } - schema, err := rowToAvroSchema(fqdn, cols, colInfos, false, "precise", "long") + schema, err := rowToAvroSchema(qualifiedName, cols, colInfos, false, "precise", "long") require.NoError(t, err) require.Equal(t, expectedSchemaWithoutExtension, indentJSON(schema)) _, err = goavro.NewCodec(schema) require.NoError(t, err) - schema, err = rowToAvroSchema(fqdn, cols, colInfos, true, "precise", "long") + schema, err = rowToAvroSchema(qualifiedName, cols, colInfos, true, "precise", "long") require.NoError(t, err) require.Equal(t, expectedSchemaWithExtension, indentJSON(schema)) _, err = goavro.NewCodec(schema) @@ -833,3 +833,16 @@ func TestAvroEnvelope(t *testing.T) { require.True(t, exists) require.Equal(t, int32(7), id) } + +func TestSanitizeColumnName(t *testing.T) { + t.Parallel() + + require.Equal(t, "normalColumnName123", sanitizeColumnName("normalColumnName123")) + require.Equal( + t, + "_1ColumnNameStartWithNumber", + sanitizeColumnName("1ColumnNameStartWithNumber"), + ) + require.Equal(t, "A_B", sanitizeColumnName("A.B")) + require.Equal(t, "columnNameWith__", sanitizeColumnName("columnNameWith中文")) +} diff --git a/cdc/sink/codec/schema_registry.go b/cdc/sink/codec/schema_registry.go index 148e25bce8f..157e561e17b 100644 --- a/cdc/sink/codec/schema_registry.go +++ b/cdc/sink/codec/schema_registry.go @@ -125,7 +125,7 @@ var regexRemoveSpaces = regexp.MustCompile(`\s`) // Register a schema in schema registry, no cache func (m *AvroSchemaManager) Register( ctx context.Context, - fqdn string, + qualifiedName string, codec *goavro.Codec, ) (int, error) { // The Schema Registry expects the JSON to be without newline characters @@ -143,7 +143,7 @@ func (m *AvroSchemaManager) Register( ) } uri := m.registryURL + "/subjects/" + url.QueryEscape( - m.tableNameToSchemaSubject(fqdn), + m.tableNameToSchemaSubject(qualifiedName), ) + "/versions" log.Debug("Registering schema", zap.String("uri", uri), zap.ByteString("payload", payload)) @@ -217,10 +217,10 @@ func (m *AvroSchemaManager) Register( // NOT USED for now, reserved for future use. func (m *AvroSchemaManager) Lookup( ctx context.Context, - fqdn string, + qualifiedName string, tiSchemaID uint64, ) (*goavro.Codec, int, error) { - key := m.tableNameToSchemaSubject(fqdn) + key := m.tableNameToSchemaSubject(qualifiedName) m.cacheRWLock.RLock() if entry, exists := m.cache[key]; exists && entry.tiSchemaID == tiSchemaID { log.Info("Avro schema lookup cache hit", @@ -336,11 +336,11 @@ type SchemaGenerator func() (string, error) // cache is out-of-sync with schema registry, we could reload it. func (m *AvroSchemaManager) GetCachedOrRegister( ctx context.Context, - fqdn string, + qualifiedName string, tiSchemaID uint64, schemaGen SchemaGenerator, ) (*goavro.Codec, int, error) { - key := m.tableNameToSchemaSubject(fqdn) + key := m.tableNameToSchemaSubject(qualifiedName) m.cacheRWLock.RLock() if entry, exists := m.cache[key]; exists && entry.tiSchemaID == tiSchemaID { log.Debug("Avro schema GetCachedOrRegister cache hit", @@ -372,7 +372,7 @@ func (m *AvroSchemaManager) GetCachedOrRegister( ) } - id, err := m.Register(ctx, fqdn, codec) + id, err := m.Register(ctx, qualifiedName, codec) if err != nil { return nil, 0, errors.Annotate( cerror.WrapError( @@ -403,8 +403,8 @@ func (m *AvroSchemaManager) GetCachedOrRegister( // ClearRegistry clears the Registry subject for the given table. Should be idempotent. // Exported for testing. // NOT USED for now, reserved for future use. -func (m *AvroSchemaManager) ClearRegistry(ctx context.Context, fqdn string) error { - uri := m.registryURL + "/subjects/" + url.QueryEscape(m.tableNameToSchemaSubject(fqdn)) +func (m *AvroSchemaManager) ClearRegistry(ctx context.Context, qualifiedName string) error { + uri := m.registryURL + "/subjects/" + url.QueryEscape(m.tableNameToSchemaSubject(qualifiedName)) req, err := http.NewRequestWithContext(ctx, "DELETE", uri, nil) if err != nil { log.Error("Could not construct request for clearRegistry", zap.String("uri", uri)) @@ -498,9 +498,9 @@ func httpRetry( return resp, nil } -func (m *AvroSchemaManager) tableNameToSchemaSubject(fqdn string) string { +func (m *AvroSchemaManager) tableNameToSchemaSubject(qualifiedName string) string { // obey the RecordNameStrategy but generate a global unique subject // https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html \ // #subject-name-strategy - return fqdn + m.subjectSuffix + return qualifiedName + m.subjectSuffix } diff --git a/cdc/sink/codec/schema_registry_test.go b/cdc/sink/codec/schema_registry_test.go index ff9b83093b7..9edbdcbb836 100644 --- a/cdc/sink/codec/schema_registry_test.go +++ b/cdc/sink/codec/schema_registry_test.go @@ -177,12 +177,12 @@ func TestSchemaRegistry(t *testing.T) { ) require.NoError(t, err) - fqdn := table.Schema + "." + table.Table + qualifiedName := getQualifiedNameFromTableName(&table) - err = manager.ClearRegistry(getTestingContext(), fqdn) + err = manager.ClearRegistry(getTestingContext(), qualifiedName) require.NoError(t, err) - _, _, err = manager.Lookup(getTestingContext(), fqdn, 1) + _, _, err = manager.Lookup(getTestingContext(), qualifiedName, 1) require.Regexp(t, `.*not\sfound.*`, err) codec, err := goavro.NewCodec(`{ @@ -198,12 +198,12 @@ func TestSchemaRegistry(t *testing.T) { }`) require.NoError(t, err) - _, err = manager.Register(getTestingContext(), fqdn, codec) + _, err = manager.Register(getTestingContext(), qualifiedName, codec) require.NoError(t, err) var id int for i := 0; i < 2; i++ { - _, id, err = manager.Lookup(getTestingContext(), fqdn, 1) + _, id, err = manager.Lookup(getTestingContext(), qualifiedName, 1) require.NoError(t, err) require.Greater(t, id, 0) } @@ -228,10 +228,10 @@ func TestSchemaRegistry(t *testing.T) { ] }`) require.NoError(t, err) - _, err = manager.Register(getTestingContext(), fqdn, codec) + _, err = manager.Register(getTestingContext(), qualifiedName, codec) require.NoError(t, err) - codec2, id2, err := manager.Lookup(getTestingContext(), fqdn, 999) + codec2, id2, err := manager.Lookup(getTestingContext(), qualifiedName, 999) require.NoError(t, err) require.NotEqual(t, id, id2) require.Equal(t, codec.CanonicalSchema(), codec2.CanonicalSchema()) @@ -255,7 +255,7 @@ func TestSchemaRegistryIdempotent(t *testing.T) { Schema: "testdb", Table: "test", } - fqdn := table.Schema + "." + table.Table + qualifiedName := getQualifiedNameFromTableName(&table) manager, err := NewAvroSchemaManager( getTestingContext(), @@ -265,7 +265,7 @@ func TestSchemaRegistryIdempotent(t *testing.T) { ) require.NoError(t, err) for i := 0; i < 20; i++ { - err = manager.ClearRegistry(getTestingContext(), fqdn) + err = manager.ClearRegistry(getTestingContext(), qualifiedName) require.NoError(t, err) } @@ -292,7 +292,7 @@ func TestSchemaRegistryIdempotent(t *testing.T) { id := 0 for i := 0; i < 20; i++ { - id1, err := manager.Register(getTestingContext(), fqdn, codec) + id1, err := manager.Register(getTestingContext(), qualifiedName, codec) require.NoError(t, err) require.True(t, id == 0 || id == id1) id = id1 @@ -341,20 +341,20 @@ func TestGetCachedOrRegister(t *testing.T) { ] }`, nil } - fqdn := table.Schema + "." + table.Table + qualifiedName := getQualifiedNameFromTableName(&table) - codec, id, err := manager.GetCachedOrRegister(getTestingContext(), fqdn, 1, schemaGen) + codec, id, err := manager.GetCachedOrRegister(getTestingContext(), qualifiedName, 1, schemaGen) require.NoError(t, err) require.Greater(t, id, 0) require.NotNil(t, codec) require.Equal(t, 1, called) - codec1, _, err := manager.GetCachedOrRegister(getTestingContext(), fqdn, 1, schemaGen) + codec1, _, err := manager.GetCachedOrRegister(getTestingContext(), qualifiedName, 1, schemaGen) require.NoError(t, err) require.True(t, codec == codec1) // check identity require.Equal(t, 1, called) - codec2, _, err := manager.GetCachedOrRegister(getTestingContext(), fqdn, 2, schemaGen) + codec2, _, err := manager.GetCachedOrRegister(getTestingContext(), qualifiedName, 2, schemaGen) require.NoError(t, err) require.NotEqual(t, codec, codec2) require.Equal(t, 2, called) @@ -390,7 +390,7 @@ func TestGetCachedOrRegister(t *testing.T) { for j := 0; j < 100; j++ { codec, id, err := manager.GetCachedOrRegister( getTestingContext(), - fqdn, + qualifiedName, uint64(finalI), schemaGen, )