From f95e1e93d3c0ddf51b7601d69f1e3e9474859c0d Mon Sep 17 00:00:00 2001 From: gongzhongqiang <764629910@qq.com> Date: Wed, 30 Aug 2023 00:22:35 +0800 Subject: [PATCH 1/3] [hotfix] [base] fix TableId.java use identifier decode error with four quota --- .../base/source/meta/split/SourceSplitSerializer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/meta/split/SourceSplitSerializer.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/meta/split/SourceSplitSerializer.java index 140b026768..378ea21cfb 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/meta/split/SourceSplitSerializer.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/meta/split/SourceSplitSerializer.java @@ -70,7 +70,7 @@ public byte[] serialize(SourceSplitBase split) throws IOException { boolean useCatalogBeforeSchema = SerializerUtils.shouldUseCatalogBeforeSchema(snapshotSplit.getTableId()); out.writeBoolean(useCatalogBeforeSchema); - out.writeUTF(snapshotSplit.getTableId().toString()); + out.writeUTF(snapshotSplit.getTableId().toDoubleQuotedString()); out.writeUTF(snapshotSplit.splitId()); out.writeUTF(snapshotSplit.getSplitKeyType().asSerializableString()); From 51916dca7a40766a711820396113d0a6040dd675 Mon Sep 17 00:00:00 2001 From: gongzhongqiang <764629910@qq.com> Date: Wed, 6 Sep 2023 17:36:07 +0800 Subject: [PATCH 2/3] [hotfix] [base] fix TableId.java use identifier decode error with four quota --- .../meta/split/SourceSplitSerializerTest.java | 96 +++++++++++++++++++ 1 file changed, 96 insertions(+) create mode 100644 flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/source/meta/split/SourceSplitSerializerTest.java diff --git a/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/source/meta/split/SourceSplitSerializerTest.java b/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/source/meta/split/SourceSplitSerializerTest.java new file mode 100644 index 0000000000..40dc70e95c --- /dev/null +++ b/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/source/meta/split/SourceSplitSerializerTest.java @@ -0,0 +1,96 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.base.source.meta.split; + +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.RowType; + +import com.ververica.cdc.connectors.base.source.meta.offset.Offset; +import com.ververica.cdc.connectors.base.source.meta.offset.OffsetFactory; +import io.debezium.relational.TableId; +import org.junit.Test; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.*; + +public class SourceSplitSerializerTest { + + @Test + public void testSnapshotTableIdSerializeAndDeserialize() throws IOException { + SnapshotSplit snapshotSplitBefore = + new SnapshotSplit( + new TableId("cata`log\"", "s\"che`ma", "ta\"ble.1`"), + "test", + new RowType( + Collections.singletonList( + new RowType.RowField("id", new BigIntType()))), + null, + null, + null, + new HashMap<>()); + + SourceSplitSerializer sourceSplitSerializer = + new SourceSplitSerializer() { + @Override + public OffsetFactory getOffsetFactory() { + return new OffsetFactory() { + @Override + public Offset newOffset(Map offset) { + return null; + } + + @Override + public Offset newOffset(String filename, Long position) { + return null; + } + + @Override + public Offset newOffset(Long position) { + return null; + } + + @Override + public Offset createTimestampOffset(long timestampMillis) { + return null; + } + + @Override + public Offset createInitialOffset() { + return null; + } + + @Override + public Offset createNoStoppingOffset() { + return null; + } + }; + } + }; + + SnapshotSplit snapshotSplitAfter = + (SnapshotSplit) + sourceSplitSerializer.deserialize( + sourceSplitSerializer.getVersion(), + sourceSplitSerializer.serialize(snapshotSplitBefore)); + + assertEquals(snapshotSplitBefore.getTableId(), snapshotSplitAfter.getTableId()); + } +} From f5d6a4f53b728fe44ef163ce41ef25b93145729b Mon Sep 17 00:00:00 2001 From: GOODBOY008 Date: Wed, 6 Sep 2023 17:40:45 +0800 Subject: [PATCH 3/3] [hotfix] [base] fix TableId.java use identifier decode error with four quota --- .../base/source/meta/split/SourceSplitSerializerTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/source/meta/split/SourceSplitSerializerTest.java b/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/source/meta/split/SourceSplitSerializerTest.java index 40dc70e95c..978c13804e 100644 --- a/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/source/meta/split/SourceSplitSerializerTest.java +++ b/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/source/meta/split/SourceSplitSerializerTest.java @@ -29,8 +29,9 @@ import java.util.HashMap; import java.util.Map; -import static org.junit.jupiter.api.Assertions.*; +import static org.junit.Assert.assertEquals; +/** Tests for {@link SourceSplitSerializer}. */ public class SourceSplitSerializerTest { @Test