From e4a2ff1ad3a96b177de923bd3cfda896b21863ae Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Tue, 20 Aug 2024 00:56:50 +0800 Subject: [PATCH] [FLINK-35243][cdc-common] Extends more schema change event types support This close #3521. --- .../common/event/AlterColumnTypeEvent.java | 82 ++++- .../cdc/common/event/DropTableEvent.java | 71 +++++ .../cdc/common/event/RenameColumnEvent.java | 3 + .../common/event/SchemaChangeEventType.java | 12 +- .../event/SchemaChangeEventTypeFamily.java | 37 ++- .../event/SchemaChangeEventWithPreSchema.java | 37 +++ .../cdc/common/event/TruncateTableEvent.java | 71 +++++ .../event/visitor/AddColumnEventVisitor.java | 28 ++ .../visitor/AlterColumnTypeEventVisitor.java | 28 ++ .../visitor/CreateTableEventVisitor.java | 28 ++ .../event/visitor/DropColumnEventVisitor.java | 28 ++ .../event/visitor/DropTableEventVisitor.java | 28 ++ .../visitor/RenameColumnEventVisitor.java | 28 ++ .../visitor/SchemaChangeEventVisitor.java | 83 +++++ .../visitor/TruncateTableEventVisitor.java | 28 ++ .../cdc/common/utils/ChangeEventUtils.java | 43 ++- .../flink/cdc/common/utils/SchemaUtils.java | 24 +- .../common/utils/ChangeEventUtilsTest.java | 78 +++-- .../cdc/common/utils/SchemaUtilsTest.java | 2 +- .../flink/FlinkPipelineComposerITCase.java | 4 +- .../doris/sink/DorisMetadataApplier.java | 195 +++++++----- .../CustomAlterTableParserListener.java | 21 ++ .../mysql/source/MySqlPipelineITCase.java | 176 +++++++++++ .../paimon/sink/PaimonMetadataApplier.java | 295 ++++++++++-------- .../sink/StarRocksMetadataApplier.java | 44 ++- .../cdc/connectors/values/ValuesDatabase.java | 20 ++ .../cdc/pipeline/tests/MysqlE2eITCase.java | 247 ++++++++++----- .../cdc/pipeline/tests/RouteE2eITCase.java | 14 +- .../pipeline/tests/SchemaEvolveE2eITCase.java | 48 ++- .../schema/coordinator/SchemaDerivation.java | 71 +++-- .../SchemaRegistryRequestHandler.java | 21 +- .../transform/PreTransformOperator.java | 12 +- .../event/AlterColumnTypeEventSerializer.java | 9 +- .../event/DropTableEventSerializer.java | 108 +++++++ .../event/SchemaChangeEventSerializer.java | 105 ++++--- .../event/TruncateTableEventSerializer.java | 108 +++++++ .../operators/schema/SchemaEvolveTest.java | 28 +- .../AlterColumnTypeEventSerializerTest.java | 9 +- .../event/DropTableEventSerializerTest.java | 50 +++ .../TruncateTableEventSerializerTest.java | 51 +++ 40 files changed, 1869 insertions(+), 506 deletions(-) create mode 100644 flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/DropTableEvent.java create mode 100644 flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/SchemaChangeEventWithPreSchema.java create mode 100644 flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/TruncateTableEvent.java create mode 100644 flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/AddColumnEventVisitor.java create mode 100644 flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/AlterColumnTypeEventVisitor.java create mode 100644 flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/CreateTableEventVisitor.java create mode 100644 flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/DropColumnEventVisitor.java create mode 100644 flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/DropTableEventVisitor.java create mode 100644 flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/RenameColumnEventVisitor.java create mode 100644 flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/SchemaChangeEventVisitor.java create mode 100644 flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/TruncateTableEventVisitor.java create mode 100644 flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/event/DropTableEventSerializer.java create mode 100644 flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/event/TruncateTableEventSerializer.java create mode 100644 flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/event/DropTableEventSerializerTest.java create mode 100644 flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/event/TruncateTableEventSerializerTest.java diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/AlterColumnTypeEvent.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/AlterColumnTypeEvent.java index a5a104140f..51acb43198 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/AlterColumnTypeEvent.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/AlterColumnTypeEvent.java @@ -17,16 +17,23 @@ package org.apache.flink.cdc.common.event; +import org.apache.flink.cdc.common.annotation.PublicEvolving; +import org.apache.flink.cdc.common.schema.Column; +import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.types.DataType; +import java.util.HashMap; import java.util.Map; import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; /** * A {@link SchemaChangeEvent} that represents an {@code ALTER COLUMN} DDL, which may contain the * lenient column type changes. */ -public class AlterColumnTypeEvent implements SchemaChangeEvent { +@PublicEvolving +public class AlterColumnTypeEvent implements SchemaChangeEventWithPreSchema, SchemaChangeEvent { private static final long serialVersionUID = 1L; @@ -35,9 +42,21 @@ public class AlterColumnTypeEvent implements SchemaChangeEvent { /** key => column name, value => column type after changing. */ private final Map typeMapping; + private final Map oldTypeMapping; + public AlterColumnTypeEvent(TableId tableId, Map typeMapping) { this.tableId = tableId; this.typeMapping = typeMapping; + this.oldTypeMapping = new HashMap<>(); + } + + public AlterColumnTypeEvent( + TableId tableId, + Map typeMapping, + Map oldTypeMapping) { + this.tableId = tableId; + this.typeMapping = typeMapping; + this.oldTypeMapping = oldTypeMapping; } /** Returns the type mapping. */ @@ -55,22 +74,34 @@ public boolean equals(Object o) { } AlterColumnTypeEvent that = (AlterColumnTypeEvent) o; return Objects.equals(tableId, that.tableId) - && Objects.equals(typeMapping, that.typeMapping); + && Objects.equals(typeMapping, that.typeMapping) + && Objects.equals(oldTypeMapping, that.oldTypeMapping); } @Override public int hashCode() { - return Objects.hash(tableId, typeMapping); + return Objects.hash(tableId, typeMapping, oldTypeMapping); } @Override public String toString() { - return "AlterColumnTypeEvent{" - + "tableId=" - + tableId - + ", nameMapping=" - + typeMapping - + '}'; + if (hasPreSchema()) { + return "AlterColumnTypeEvent{" + + "tableId=" + + tableId + + ", typeMapping=" + + typeMapping + + ", oldTypeMapping=" + + oldTypeMapping + + '}'; + } else { + return "AlterColumnTypeEvent{" + + "tableId=" + + tableId + + ", typeMapping=" + + typeMapping + + '}'; + } } @Override @@ -78,6 +109,39 @@ public TableId tableId() { return tableId; } + public Map getOldTypeMapping() { + return oldTypeMapping; + } + + @Override + public boolean hasPreSchema() { + return !oldTypeMapping.isEmpty(); + } + + @Override + public void fillPreSchema(Schema oldTypeSchema) { + oldTypeMapping.clear(); + oldTypeMapping.putAll( + oldTypeSchema.getColumns().stream() + .filter(e -> typeMapping.containsKey(e.getName()) && e.getType() != null) + .collect(Collectors.toMap(Column::getName, Column::getType))); + } + + @Override + public boolean trimRedundantChanges() { + if (hasPreSchema()) { + Set redundantlyChangedColumns = + typeMapping.keySet().stream() + .filter(e -> Objects.equals(typeMapping.get(e), oldTypeMapping.get(e))) + .collect(Collectors.toSet()); + + // Remove redundant alter column type records that doesn't really change the type + typeMapping.keySet().removeAll(redundantlyChangedColumns); + oldTypeMapping.keySet().removeAll(redundantlyChangedColumns); + } + return !typeMapping.isEmpty(); + } + @Override public SchemaChangeEventType getType() { return SchemaChangeEventType.ALTER_COLUMN_TYPE; diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/DropTableEvent.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/DropTableEvent.java new file mode 100644 index 0000000000..dd5efdd081 --- /dev/null +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/DropTableEvent.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.common.event; + +import org.apache.flink.cdc.common.annotation.PublicEvolving; +import org.apache.flink.cdc.common.source.DataSource; + +import java.util.Objects; + +/** + * A {@link SchemaChangeEvent} that represents an {@code DROP TABLE} DDL. this will be sent by + * {@link DataSource} before all {@link DataChangeEvent} with the same tableId. + */ +@PublicEvolving +public class DropTableEvent implements SchemaChangeEvent { + + private static final long serialVersionUID = 1L; + + private final TableId tableId; + + public DropTableEvent(TableId tableId) { + this.tableId = tableId; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof DropTableEvent)) { + return false; + } + DropTableEvent that = (DropTableEvent) o; + return Objects.equals(tableId, that.tableId); + } + + @Override + public int hashCode() { + return Objects.hash(tableId); + } + + @Override + public String toString() { + return "DropTableEvent{" + "tableId=" + tableId + '}'; + } + + @Override + public TableId tableId() { + return tableId; + } + + @Override + public SchemaChangeEventType getType() { + return SchemaChangeEventType.DROP_TABLE; + } +} diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/RenameColumnEvent.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/RenameColumnEvent.java index 4558c1508c..8bde378720 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/RenameColumnEvent.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/RenameColumnEvent.java @@ -17,6 +17,8 @@ package org.apache.flink.cdc.common.event; +import org.apache.flink.cdc.common.annotation.PublicEvolving; + import java.util.Map; import java.util.Objects; @@ -24,6 +26,7 @@ * A {@link SchemaChangeEvent} that represents an {@code RENAME COLUMN} DDL, which may contain the * lenient column type changes. */ +@PublicEvolving public class RenameColumnEvent implements SchemaChangeEvent { private static final long serialVersionUID = 1L; diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/SchemaChangeEventType.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/SchemaChangeEventType.java index 668ea76a4d..8132c29a3f 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/SchemaChangeEventType.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/SchemaChangeEventType.java @@ -26,7 +26,9 @@ public enum SchemaChangeEventType { ALTER_COLUMN_TYPE, CREATE_TABLE, DROP_COLUMN, - RENAME_COLUMN; + DROP_TABLE, + RENAME_COLUMN, + TRUNCATE_TABLE; public static SchemaChangeEventType ofEvent(SchemaChangeEvent event) { if (event instanceof AddColumnEvent) { @@ -37,8 +39,12 @@ public static SchemaChangeEventType ofEvent(SchemaChangeEvent event) { return CREATE_TABLE; } else if (event instanceof DropColumnEvent) { return DROP_COLUMN; + } else if (event instanceof DropTableEvent) { + return DROP_TABLE; } else if (event instanceof RenameColumnEvent) { return RENAME_COLUMN; + } else if (event instanceof TruncateTableEvent) { + return TRUNCATE_TABLE; } else { throw new RuntimeException("Unknown schema change event type: " + event.getClass()); } @@ -54,8 +60,12 @@ public static SchemaChangeEventType ofTag(String tag) { return CREATE_TABLE; case "drop.column": return DROP_COLUMN; + case "drop.table": + return DROP_TABLE; case "rename.column": return RENAME_COLUMN; + case "truncate.table": + return TRUNCATE_TABLE; default: throw new RuntimeException("Unknown schema change event type: " + tag); } diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/SchemaChangeEventTypeFamily.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/SchemaChangeEventTypeFamily.java index 5ff7187f61..c1adfd7161 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/SchemaChangeEventTypeFamily.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/SchemaChangeEventTypeFamily.java @@ -19,6 +19,14 @@ import org.apache.flink.cdc.common.annotation.PublicEvolving; +import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ADD_COLUMN; +import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ALTER_COLUMN_TYPE; +import static org.apache.flink.cdc.common.event.SchemaChangeEventType.CREATE_TABLE; +import static org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_COLUMN; +import static org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_TABLE; +import static org.apache.flink.cdc.common.event.SchemaChangeEventType.RENAME_COLUMN; +import static org.apache.flink.cdc.common.event.SchemaChangeEventType.TRUNCATE_TABLE; + /** * An enumeration of schema change event families for clustering {@link SchemaChangeEvent}s into * categories. @@ -26,31 +34,30 @@ @PublicEvolving public class SchemaChangeEventTypeFamily { - public static final SchemaChangeEventType[] ADD = {SchemaChangeEventType.ADD_COLUMN}; + public static final SchemaChangeEventType[] ADD = {ADD_COLUMN}; - public static final SchemaChangeEventType[] ALTER = {SchemaChangeEventType.ALTER_COLUMN_TYPE}; + public static final SchemaChangeEventType[] ALTER = {ALTER_COLUMN_TYPE}; - public static final SchemaChangeEventType[] CREATE = {SchemaChangeEventType.CREATE_TABLE}; + public static final SchemaChangeEventType[] CREATE = {CREATE_TABLE}; - public static final SchemaChangeEventType[] DROP = {SchemaChangeEventType.DROP_COLUMN}; + public static final SchemaChangeEventType[] DROP = {DROP_COLUMN, DROP_TABLE}; - public static final SchemaChangeEventType[] RENAME = {SchemaChangeEventType.RENAME_COLUMN}; + public static final SchemaChangeEventType[] RENAME = {RENAME_COLUMN}; - public static final SchemaChangeEventType[] TABLE = {SchemaChangeEventType.CREATE_TABLE}; + public static final SchemaChangeEventType[] TABLE = {CREATE_TABLE, DROP_TABLE, TRUNCATE_TABLE}; public static final SchemaChangeEventType[] COLUMN = { - SchemaChangeEventType.ADD_COLUMN, - SchemaChangeEventType.ALTER_COLUMN_TYPE, - SchemaChangeEventType.DROP_COLUMN, - SchemaChangeEventType.RENAME_COLUMN + ADD_COLUMN, ALTER_COLUMN_TYPE, DROP_COLUMN, RENAME_COLUMN }; public static final SchemaChangeEventType[] ALL = { - SchemaChangeEventType.ADD_COLUMN, - SchemaChangeEventType.CREATE_TABLE, - SchemaChangeEventType.ALTER_COLUMN_TYPE, - SchemaChangeEventType.DROP_COLUMN, - SchemaChangeEventType.RENAME_COLUMN + ADD_COLUMN, + ALTER_COLUMN_TYPE, + CREATE_TABLE, + DROP_COLUMN, + DROP_TABLE, + RENAME_COLUMN, + TRUNCATE_TABLE }; public static final SchemaChangeEventType[] NONE = {}; diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/SchemaChangeEventWithPreSchema.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/SchemaChangeEventWithPreSchema.java new file mode 100644 index 0000000000..155c103cd3 --- /dev/null +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/SchemaChangeEventWithPreSchema.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.common.event; + +import org.apache.flink.cdc.common.annotation.PublicEvolving; +import org.apache.flink.cdc.common.schema.Schema; + +/** A {@link SchemaChangeEvent} that supports appending schema before change event. */ +@PublicEvolving +public interface SchemaChangeEventWithPreSchema extends SchemaChangeEvent { + + /** Describes if this event already has schema before change info. */ + boolean hasPreSchema(); + + /** Append schema before change info to this event. */ + void fillPreSchema(Schema oldSchema); + + /** Check if this event contains redundant schema change request only. */ + default boolean trimRedundantChanges() { + return false; + } +} diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/TruncateTableEvent.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/TruncateTableEvent.java new file mode 100644 index 0000000000..2144ff2837 --- /dev/null +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/TruncateTableEvent.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.common.event; + +import org.apache.flink.cdc.common.annotation.PublicEvolving; +import org.apache.flink.cdc.common.source.DataSource; + +import java.util.Objects; + +/** + * A {@link SchemaChangeEvent} that represents an {@code TRUNCATE TABLE} DDL. this will be sent by + * {@link DataSource} before all {@link DataChangeEvent} with the same tableId. + */ +@PublicEvolving +public class TruncateTableEvent implements SchemaChangeEvent { + + private static final long serialVersionUID = 1L; + + private final TableId tableId; + + public TruncateTableEvent(TableId tableId) { + this.tableId = tableId; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof TruncateTableEvent)) { + return false; + } + TruncateTableEvent that = (TruncateTableEvent) o; + return Objects.equals(tableId, that.tableId); + } + + @Override + public int hashCode() { + return Objects.hash(tableId); + } + + @Override + public String toString() { + return "TruncateTableEvent{" + "tableId=" + tableId + '}'; + } + + @Override + public TableId tableId() { + return tableId; + } + + @Override + public SchemaChangeEventType getType() { + return SchemaChangeEventType.TRUNCATE_TABLE; + } +} diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/AddColumnEventVisitor.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/AddColumnEventVisitor.java new file mode 100644 index 0000000000..e709f7bbe0 --- /dev/null +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/AddColumnEventVisitor.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.common.event.visitor; + +import org.apache.flink.cdc.common.annotation.Internal; +import org.apache.flink.cdc.common.event.AddColumnEvent; + +/** Visitor for {@link AddColumnEvent}s. */ +@Internal +@FunctionalInterface +public interface AddColumnEventVisitor { + T visit(AddColumnEvent event) throws E; +} diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/AlterColumnTypeEventVisitor.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/AlterColumnTypeEventVisitor.java new file mode 100644 index 0000000000..027ddc88d4 --- /dev/null +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/AlterColumnTypeEventVisitor.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.common.event.visitor; + +import org.apache.flink.cdc.common.annotation.Internal; +import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; + +/** Visitor for {@link AlterColumnTypeEvent}s. */ +@Internal +@FunctionalInterface +public interface AlterColumnTypeEventVisitor { + T visit(AlterColumnTypeEvent event) throws E; +} diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/CreateTableEventVisitor.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/CreateTableEventVisitor.java new file mode 100644 index 0000000000..d20f118c93 --- /dev/null +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/CreateTableEventVisitor.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.common.event.visitor; + +import org.apache.flink.cdc.common.annotation.Internal; +import org.apache.flink.cdc.common.event.CreateTableEvent; + +/** Visitor for {@link CreateTableEvent}s. */ +@Internal +@FunctionalInterface +public interface CreateTableEventVisitor { + T visit(CreateTableEvent event) throws E; +} diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/DropColumnEventVisitor.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/DropColumnEventVisitor.java new file mode 100644 index 0000000000..1a79baa3d3 --- /dev/null +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/DropColumnEventVisitor.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.common.event.visitor; + +import org.apache.flink.cdc.common.annotation.Internal; +import org.apache.flink.cdc.common.event.DropColumnEvent; + +/** Visitor for {@link DropColumnEvent}s. */ +@Internal +@FunctionalInterface +public interface DropColumnEventVisitor { + T visit(DropColumnEvent event) throws E; +} diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/DropTableEventVisitor.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/DropTableEventVisitor.java new file mode 100644 index 0000000000..074ae67ee0 --- /dev/null +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/DropTableEventVisitor.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.common.event.visitor; + +import org.apache.flink.cdc.common.annotation.Internal; +import org.apache.flink.cdc.common.event.DropTableEvent; + +/** Visitor for {@link DropTableEvent}s. */ +@Internal +@FunctionalInterface +public interface DropTableEventVisitor { + T visit(DropTableEvent event) throws E; +} diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/RenameColumnEventVisitor.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/RenameColumnEventVisitor.java new file mode 100644 index 0000000000..304452528d --- /dev/null +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/RenameColumnEventVisitor.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.common.event.visitor; + +import org.apache.flink.cdc.common.annotation.Internal; +import org.apache.flink.cdc.common.event.RenameColumnEvent; + +/** Visitor for {@link RenameColumnEvent}s. */ +@Internal +@FunctionalInterface +public interface RenameColumnEventVisitor { + T visit(RenameColumnEvent event) throws E; +} diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/SchemaChangeEventVisitor.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/SchemaChangeEventVisitor.java new file mode 100644 index 0000000000..c905c8dee4 --- /dev/null +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/SchemaChangeEventVisitor.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.common.event.visitor; + +import org.apache.flink.cdc.common.annotation.Internal; +import org.apache.flink.cdc.common.event.AddColumnEvent; +import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; +import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.event.DropColumnEvent; +import org.apache.flink.cdc.common.event.DropTableEvent; +import org.apache.flink.cdc.common.event.RenameColumnEvent; +import org.apache.flink.cdc.common.event.SchemaChangeEvent; +import org.apache.flink.cdc.common.event.TruncateTableEvent; + +/** Visitor clas for all {@link SchemaChangeEvent}s and returns a specific typed object. */ +@Internal +public class SchemaChangeEventVisitor { + public static T visit( + SchemaChangeEvent event, + AddColumnEventVisitor addColumnVisitor, + AlterColumnTypeEventVisitor alterColumnTypeEventVisitor, + CreateTableEventVisitor createTableEventVisitor, + DropColumnEventVisitor dropColumnEventVisitor, + DropTableEventVisitor dropTableEventVisitor, + RenameColumnEventVisitor renameColumnEventVisitor, + TruncateTableEventVisitor truncateTableEventVisitor) + throws E { + if (event instanceof AddColumnEvent) { + if (addColumnVisitor == null) { + return null; + } + return addColumnVisitor.visit((AddColumnEvent) event); + } else if (event instanceof AlterColumnTypeEvent) { + if (alterColumnTypeEventVisitor == null) { + return null; + } + return alterColumnTypeEventVisitor.visit((AlterColumnTypeEvent) event); + } else if (event instanceof CreateTableEvent) { + if (createTableEventVisitor == null) { + return null; + } + return createTableEventVisitor.visit((CreateTableEvent) event); + } else if (event instanceof DropColumnEvent) { + if (dropColumnEventVisitor == null) { + return null; + } + return dropColumnEventVisitor.visit((DropColumnEvent) event); + } else if (event instanceof DropTableEvent) { + if (dropTableEventVisitor == null) { + return null; + } + return dropTableEventVisitor.visit((DropTableEvent) event); + } else if (event instanceof RenameColumnEvent) { + if (renameColumnEventVisitor == null) { + return null; + } + return renameColumnEventVisitor.visit((RenameColumnEvent) event); + } else if (event instanceof TruncateTableEvent) { + if (truncateTableEventVisitor == null) { + return null; + } + return truncateTableEventVisitor.visit((TruncateTableEvent) event); + } else { + throw new IllegalArgumentException( + "Unknown schema change event type " + event.getType()); + } + } +} diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/TruncateTableEventVisitor.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/TruncateTableEventVisitor.java new file mode 100644 index 0000000000..020c5e8c91 --- /dev/null +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/TruncateTableEventVisitor.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.common.event.visitor; + +import org.apache.flink.cdc.common.annotation.Internal; +import org.apache.flink.cdc.common.event.TruncateTableEvent; + +/** Visitor for {@link TruncateTableEvent}s. */ +@Internal +@FunctionalInterface +public interface TruncateTableEventVisitor { + T visit(TruncateTableEvent event) throws E; +} diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/ChangeEventUtils.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/ChangeEventUtils.java index ee72ef6e23..940dc31448 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/ChangeEventUtils.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/ChangeEventUtils.java @@ -23,11 +23,14 @@ import org.apache.flink.cdc.common.event.CreateTableEvent; import org.apache.flink.cdc.common.event.DataChangeEvent; import org.apache.flink.cdc.common.event.DropColumnEvent; +import org.apache.flink.cdc.common.event.DropTableEvent; import org.apache.flink.cdc.common.event.RenameColumnEvent; import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.common.event.SchemaChangeEventType; import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily; import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.event.TruncateTableEvent; +import org.apache.flink.cdc.common.event.visitor.SchemaChangeEventVisitor; import java.util.ArrayList; import java.util.Arrays; @@ -65,30 +68,22 @@ public static DataChangeEvent recreateDataChangeEvent( public static SchemaChangeEvent recreateSchemaChangeEvent( SchemaChangeEvent schemaChangeEvent, TableId tableId) { - if (schemaChangeEvent instanceof CreateTableEvent) { - CreateTableEvent createTableEvent = (CreateTableEvent) schemaChangeEvent; - return new CreateTableEvent(tableId, createTableEvent.getSchema()); - } - if (schemaChangeEvent instanceof AlterColumnTypeEvent) { - AlterColumnTypeEvent alterColumnTypeEvent = (AlterColumnTypeEvent) schemaChangeEvent; - return new AlterColumnTypeEvent(tableId, alterColumnTypeEvent.getTypeMapping()); - } - if (schemaChangeEvent instanceof RenameColumnEvent) { - RenameColumnEvent renameColumnEvent = (RenameColumnEvent) schemaChangeEvent; - return new RenameColumnEvent(tableId, renameColumnEvent.getNameMapping()); - } - if (schemaChangeEvent instanceof DropColumnEvent) { - DropColumnEvent dropColumnEvent = (DropColumnEvent) schemaChangeEvent; - return new DropColumnEvent(tableId, dropColumnEvent.getDroppedColumnNames()); - } - if (schemaChangeEvent instanceof AddColumnEvent) { - AddColumnEvent addColumnEvent = (AddColumnEvent) schemaChangeEvent; - return new AddColumnEvent(tableId, addColumnEvent.getAddedColumns()); - } - throw new UnsupportedOperationException( - String.format( - "Unsupported schema change event with type \"%s\"", - schemaChangeEvent.getClass().getCanonicalName())); + + return SchemaChangeEventVisitor.visit( + schemaChangeEvent, + addColumnEvent -> new AddColumnEvent(tableId, addColumnEvent.getAddedColumns()), + alterColumnEvent -> + new AlterColumnTypeEvent( + tableId, + alterColumnEvent.getTypeMapping(), + alterColumnEvent.getOldTypeMapping()), + createTableEvent -> new CreateTableEvent(tableId, createTableEvent.getSchema()), + dropColumnEvent -> + new DropColumnEvent(tableId, dropColumnEvent.getDroppedColumnNames()), + dropTableEvent -> new DropTableEvent(tableId), + renameColumnEvent -> + new RenameColumnEvent(tableId, renameColumnEvent.getNameMapping()), + truncateTableEvent -> new TruncateTableEvent(tableId)); } public static Set resolveSchemaEvolutionOptions( diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java index 68cdcd0e1d..04e5f59815 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java @@ -25,6 +25,7 @@ import org.apache.flink.cdc.common.event.DropColumnEvent; import org.apache.flink.cdc.common.event.RenameColumnEvent; import org.apache.flink.cdc.common.event.SchemaChangeEvent; +import org.apache.flink.cdc.common.event.visitor.SchemaChangeEventVisitor; import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.types.DataType; @@ -244,20 +245,15 @@ public static int getNumericPrecision(DataType dataType) { /** apply SchemaChangeEvent to the old schema and return the schema after changing. */ public static Schema applySchemaChangeEvent(Schema schema, SchemaChangeEvent event) { - if (event instanceof AddColumnEvent) { - return applyAddColumnEvent((AddColumnEvent) event, schema); - } else if (event instanceof DropColumnEvent) { - return applyDropColumnEvent((DropColumnEvent) event, schema); - } else if (event instanceof RenameColumnEvent) { - return applyRenameColumnEvent((RenameColumnEvent) event, schema); - } else if (event instanceof AlterColumnTypeEvent) { - return applyAlterColumnTypeEvent((AlterColumnTypeEvent) event, schema); - } else { - throw new UnsupportedOperationException( - String.format( - "Unsupported schema change event type \"%s\"", - event.getClass().getCanonicalName())); - } + return SchemaChangeEventVisitor.visit( + event, + addColumnEvent -> applyAddColumnEvent(addColumnEvent, schema), + alterColumnTypeEvent -> applyAlterColumnTypeEvent(alterColumnTypeEvent, schema), + createTableEvent -> createTableEvent.getSchema(), + dropColumnEvent -> applyDropColumnEvent(dropColumnEvent, schema), + dropTableEvent -> schema, + renameColumnEvent -> applyRenameColumnEvent(renameColumnEvent, schema), + truncateTableEvent -> schema); } private static Schema applyAddColumnEvent(AddColumnEvent event, Schema oldSchema) { diff --git a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/ChangeEventUtilsTest.java b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/ChangeEventUtilsTest.java index 0a34a75018..fd3636191d 100644 --- a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/ChangeEventUtilsTest.java +++ b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/ChangeEventUtilsTest.java @@ -17,7 +17,6 @@ package org.apache.flink.cdc.common.utils; -import org.assertj.core.api.Assertions; import org.assertj.core.util.Sets; import org.junit.jupiter.api.Test; @@ -28,91 +27,108 @@ import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ALTER_COLUMN_TYPE; import static org.apache.flink.cdc.common.event.SchemaChangeEventType.CREATE_TABLE; import static org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_COLUMN; +import static org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_TABLE; import static org.apache.flink.cdc.common.event.SchemaChangeEventType.RENAME_COLUMN; +import static org.apache.flink.cdc.common.event.SchemaChangeEventType.TRUNCATE_TABLE; +import static org.apache.flink.cdc.common.testutils.assertions.EventAssertions.assertThat; /** A test for the {@link org.apache.flink.cdc.common.utils.ChangeEventUtils}. */ public class ChangeEventUtilsTest { @Test public void testResolveSchemaEvolutionOptions() { - Assertions.assertThat( + assertThat( ChangeEventUtils.resolveSchemaEvolutionOptions( Collections.emptyList(), Collections.emptyList())) .isEqualTo( Sets.set( + TRUNCATE_TABLE, + RENAME_COLUMN, CREATE_TABLE, - ADD_COLUMN, + DROP_TABLE, ALTER_COLUMN_TYPE, - DROP_COLUMN, - RENAME_COLUMN)); + ADD_COLUMN, + DROP_COLUMN)); - Assertions.assertThat( + assertThat( ChangeEventUtils.resolveSchemaEvolutionOptions( Collections.emptyList(), Collections.singletonList("drop"))) - .isEqualTo(Sets.set(CREATE_TABLE, ADD_COLUMN, ALTER_COLUMN_TYPE, RENAME_COLUMN)); + .isEqualTo( + Sets.set( + ADD_COLUMN, + ALTER_COLUMN_TYPE, + RENAME_COLUMN, + CREATE_TABLE, + TRUNCATE_TABLE)); - Assertions.assertThat( + assertThat( ChangeEventUtils.resolveSchemaEvolutionOptions( Arrays.asList("create", "add"), Collections.emptyList())) - .isEqualTo(Sets.set(CREATE_TABLE, ADD_COLUMN)); + .isEqualTo(Sets.set(ADD_COLUMN, CREATE_TABLE)); - Assertions.assertThat( + assertThat( ChangeEventUtils.resolveSchemaEvolutionOptions( Collections.singletonList("column"), Collections.singletonList("drop.column"))) .isEqualTo(Sets.set(ADD_COLUMN, ALTER_COLUMN_TYPE, RENAME_COLUMN)); - Assertions.assertThat( + assertThat( ChangeEventUtils.resolveSchemaEvolutionOptions( Collections.emptyList(), Collections.singletonList("drop.column"))) - .isEqualTo(Sets.set(CREATE_TABLE, ADD_COLUMN, ALTER_COLUMN_TYPE, RENAME_COLUMN)); + .isEqualTo( + Sets.set( + ADD_COLUMN, + DROP_TABLE, + TRUNCATE_TABLE, + RENAME_COLUMN, + ALTER_COLUMN_TYPE, + CREATE_TABLE)); } @Test public void testResolveSchemaEvolutionTag() { - Assertions.assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("all")) + assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("all")) .isEqualTo( Arrays.asList( ADD_COLUMN, - CREATE_TABLE, ALTER_COLUMN_TYPE, + CREATE_TABLE, DROP_COLUMN, - RENAME_COLUMN)); + DROP_TABLE, + RENAME_COLUMN, + TRUNCATE_TABLE)); - Assertions.assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("column")) + assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("column")) .isEqualTo( Arrays.asList(ADD_COLUMN, ALTER_COLUMN_TYPE, DROP_COLUMN, RENAME_COLUMN)); - Assertions.assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("table")) - .isEqualTo(Collections.singletonList(CREATE_TABLE)); - - Assertions.assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("rename")) - .isEqualTo(Collections.singletonList(RENAME_COLUMN)); + assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("table")) + .isEqualTo(Arrays.asList(CREATE_TABLE, DROP_TABLE, TRUNCATE_TABLE)); - Assertions.assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("rename.column")) + assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("rename.column")) .isEqualTo(Collections.singletonList(RENAME_COLUMN)); - Assertions.assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("drop")) - .isEqualTo(Collections.singletonList(DROP_COLUMN)); + assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("drop")) + .isEqualTo(Arrays.asList(DROP_COLUMN, DROP_TABLE)); - Assertions.assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("drop.column")) + assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("drop.column")) .isEqualTo(Collections.singletonList(DROP_COLUMN)); - Assertions.assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("create")) + assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("create")) .isEqualTo(Collections.singletonList(CREATE_TABLE)); - Assertions.assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("create.table")) + assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("create.table")) .isEqualTo(Collections.singletonList(CREATE_TABLE)); - Assertions.assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("alter")) + assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("alter")) .isEqualTo(Collections.singletonList(ALTER_COLUMN_TYPE)); - Assertions.assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("alter.column.type")) + assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("alter.column.type")) .isEqualTo(Collections.singletonList(ALTER_COLUMN_TYPE)); - Assertions.assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("add")) + assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("add")) .isEqualTo(Collections.singletonList(ADD_COLUMN)); - Assertions.assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("add.column")) + assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("add.column")) .isEqualTo(Collections.singletonList(ADD_COLUMN)); } } diff --git a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java index 8a508a8908..11fa5d8149 100644 --- a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java +++ b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java @@ -40,7 +40,7 @@ public class SchemaUtilsTest { @Test - public void testApplySchemaChangeEvent() { + public void testApplyColumnSchemaChangeEvent() { TableId tableId = TableId.parse("default.default.table1"); Schema schema = Schema.newBuilder() diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java index 45cdbe9e85..e746386389 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java @@ -798,7 +798,7 @@ void testMergingWithRoute() throws Exception { "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[2, Bob, 20], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[2, Bob, 20], after=[2, Bob, 30], op=UPDATE, meta=()}", "AddColumnEvent{tableId=default_namespace.default_schema.merged, addedColumns=[ColumnWithPosition{column=`description` STRING, position=LAST, existedColumnName=null}]}", - "AlterColumnTypeEvent{tableId=default_namespace.default_schema.merged, nameMapping={age=BIGINT, id=BIGINT}}", + "AlterColumnTypeEvent{tableId=default_namespace.default_schema.merged, typeMapping={age=BIGINT, id=BIGINT}, oldTypeMapping={age=INT, id=INT}}", "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[3, Charlie, 15, student], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[4, Donald, 25, student], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[4, Donald, 25, student], after=[], op=DELETE, meta=()}", @@ -1004,7 +1004,7 @@ void testTransformMergingWithRoute() throws Exception { "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[2, Bob, 20, last_name], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[2, Bob, 20, last_name], after=[2, Bob, 30, last_name], op=UPDATE, meta=()}", "AddColumnEvent{tableId=default_namespace.default_schema.merged, addedColumns=[ColumnWithPosition{column=`description` STRING, position=LAST, existedColumnName=null}]}", - "AlterColumnTypeEvent{tableId=default_namespace.default_schema.merged, nameMapping={age=BIGINT, id=BIGINT}}", + "AlterColumnTypeEvent{tableId=default_namespace.default_schema.merged, typeMapping={age=BIGINT, id=BIGINT}, oldTypeMapping={age=INT, id=INT}}", "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[3, Charlie, 15, last_name, student], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[4, Donald, 25, last_name, student], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[4, Donald, 25, last_name, student], after=[], op=DELETE, meta=()}", diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java index 811d03298d..5ee181985d 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java @@ -26,6 +26,7 @@ import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.common.event.SchemaChangeEventType; import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.event.visitor.SchemaChangeEventVisitor; import org.apache.flink.cdc.common.exceptions.SchemaEvolveException; import org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException; import org.apache.flink.cdc.common.schema.Column; @@ -46,12 +47,10 @@ import org.apache.doris.flink.catalog.doris.FieldSchema; import org.apache.doris.flink.catalog.doris.TableSchema; import org.apache.doris.flink.cfg.DorisOptions; -import org.apache.doris.flink.exception.IllegalArgumentException; import org.apache.doris.flink.sink.schema.SchemaChangeManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.LinkedHashMap; @@ -60,6 +59,7 @@ import java.util.Set; import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ADD_COLUMN; +import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ALTER_COLUMN_TYPE; import static org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_COLUMN; import static org.apache.flink.cdc.common.event.SchemaChangeEventType.RENAME_COLUMN; import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.TABLE_CREATE_PROPERTIES_PREFIX; @@ -93,52 +93,66 @@ public boolean acceptsSchemaEvolutionType(SchemaChangeEventType schemaChangeEven @Override public Set getSupportedSchemaEvolutionTypes() { - return Sets.newHashSet(ADD_COLUMN, DROP_COLUMN, RENAME_COLUMN); + return Sets.newHashSet(ADD_COLUMN, ALTER_COLUMN_TYPE, DROP_COLUMN, RENAME_COLUMN); } @Override - public void applySchemaChange(SchemaChangeEvent event) throws SchemaEvolveException { + public void applySchemaChange(SchemaChangeEvent event) { + SchemaChangeEventVisitor.visit( + event, + addColumnEvent -> { + applyAddColumnEvent(addColumnEvent); + return null; + }, + alterColumnTypeEvent -> { + applyAlterColumnTypeEvent(alterColumnTypeEvent); + return null; + }, + createTableEvent -> { + applyCreateTableEvent(createTableEvent); + return null; + }, + dropColumnEvent -> { + applyDropColumnEvent(dropColumnEvent); + return null; + }, + dropTableEvent -> { + throw new UnsupportedSchemaChangeEventException(event); + }, + renameColumnEvent -> { + applyRenameColumnEvent(renameColumnEvent); + return null; + }, + truncateTableEvent -> { + throw new UnsupportedSchemaChangeEventException(event); + }); + } + + private void applyCreateTableEvent(CreateTableEvent event) throws SchemaEvolveException { try { - // send schema change op to doris - if (event instanceof CreateTableEvent) { - applyCreateTableEvent((CreateTableEvent) event); - } else if (event instanceof AddColumnEvent) { - applyAddColumnEvent((AddColumnEvent) event); - } else if (event instanceof DropColumnEvent) { - applyDropColumnEvent((DropColumnEvent) event); - } else if (event instanceof RenameColumnEvent) { - applyRenameColumnEvent((RenameColumnEvent) event); - } else if (event instanceof AlterColumnTypeEvent) { - applyAlterColumnTypeEvent((AlterColumnTypeEvent) event); + Schema schema = event.getSchema(); + TableId tableId = event.tableId(); + TableSchema tableSchema = new TableSchema(); + tableSchema.setTable(tableId.getTableName()); + tableSchema.setDatabase(tableId.getSchemaName()); + tableSchema.setFields(buildFields(schema)); + tableSchema.setDistributeKeys(buildDistributeKeys(schema)); + + if (CollectionUtil.isNullOrEmpty(schema.primaryKeys())) { + tableSchema.setModel(DataModel.DUPLICATE); } else { - throw new UnsupportedSchemaChangeEventException(event); + tableSchema.setKeys(schema.primaryKeys()); + tableSchema.setModel(DataModel.UNIQUE); } - } catch (Exception ex) { - throw new SchemaEvolveException(event, ex.getMessage(), null); - } - } - private void applyCreateTableEvent(CreateTableEvent event) - throws IOException, IllegalArgumentException { - Schema schema = event.getSchema(); - TableId tableId = event.tableId(); - TableSchema tableSchema = new TableSchema(); - tableSchema.setTable(tableId.getTableName()); - tableSchema.setDatabase(tableId.getSchemaName()); - tableSchema.setFields(buildFields(schema)); - tableSchema.setDistributeKeys(buildDistributeKeys(schema)); - - if (CollectionUtil.isNullOrEmpty(schema.primaryKeys())) { - tableSchema.setModel(DataModel.DUPLICATE); - } else { - tableSchema.setKeys(schema.primaryKeys()); - tableSchema.setModel(DataModel.UNIQUE); + Map tableProperties = + DorisDataSinkOptions.getPropertiesByPrefix( + config, TABLE_CREATE_PROPERTIES_PREFIX); + tableSchema.setProperties(tableProperties); + schemaChangeManager.createTable(tableSchema); + } catch (Exception e) { + throw new SchemaEvolveException(event, e.getMessage(), e); } - - Map tableProperties = - DorisDataSinkOptions.getPropertiesByPrefix(config, TABLE_CREATE_PROPERTIES_PREFIX); - tableSchema.setProperties(tableProperties); - schemaChangeManager.createTable(tableSchema); } private Map buildFields(Schema schema) { @@ -191,59 +205,74 @@ private String buildTypeString(DataType dataType) { } } - private void applyAddColumnEvent(AddColumnEvent event) - throws IOException, IllegalArgumentException { - TableId tableId = event.tableId(); - List addedColumns = event.getAddedColumns(); - for (AddColumnEvent.ColumnWithPosition col : addedColumns) { - Column column = col.getAddColumn(); - FieldSchema addFieldSchema = - new FieldSchema( - column.getName(), - buildTypeString(column.getType()), - column.getDefaultValueExpression(), - column.getComment()); - schemaChangeManager.addColumn( - tableId.getSchemaName(), tableId.getTableName(), addFieldSchema); + private void applyAddColumnEvent(AddColumnEvent event) throws SchemaEvolveException { + try { + TableId tableId = event.tableId(); + List addedColumns = event.getAddedColumns(); + for (AddColumnEvent.ColumnWithPosition col : addedColumns) { + Column column = col.getAddColumn(); + FieldSchema addFieldSchema = + new FieldSchema( + column.getName(), + buildTypeString(column.getType()), + column.getDefaultValueExpression(), + column.getComment()); + schemaChangeManager.addColumn( + tableId.getSchemaName(), tableId.getTableName(), addFieldSchema); + } + } catch (Exception e) { + throw new SchemaEvolveException(event, e.getMessage(), e); } } - private void applyDropColumnEvent(DropColumnEvent event) - throws IOException, IllegalArgumentException { - TableId tableId = event.tableId(); - List droppedColumns = event.getDroppedColumnNames(); - for (String col : droppedColumns) { - schemaChangeManager.dropColumn(tableId.getSchemaName(), tableId.getTableName(), col); + private void applyDropColumnEvent(DropColumnEvent event) throws SchemaEvolveException { + try { + TableId tableId = event.tableId(); + List droppedColumns = event.getDroppedColumnNames(); + for (String col : droppedColumns) { + schemaChangeManager.dropColumn( + tableId.getSchemaName(), tableId.getTableName(), col); + } + } catch (Exception e) { + throw new SchemaEvolveException(event, e.getMessage(), e); } } - private void applyRenameColumnEvent(RenameColumnEvent event) - throws IOException, IllegalArgumentException { - TableId tableId = event.tableId(); - Map nameMapping = event.getNameMapping(); - for (Map.Entry entry : nameMapping.entrySet()) { - schemaChangeManager.renameColumn( - tableId.getSchemaName(), - tableId.getTableName(), - entry.getKey(), - entry.getValue()); + private void applyRenameColumnEvent(RenameColumnEvent event) throws SchemaEvolveException { + try { + TableId tableId = event.tableId(); + Map nameMapping = event.getNameMapping(); + for (Map.Entry entry : nameMapping.entrySet()) { + schemaChangeManager.renameColumn( + tableId.getSchemaName(), + tableId.getTableName(), + entry.getKey(), + entry.getValue()); + } + } catch (Exception e) { + throw new SchemaEvolveException(event, e.getMessage(), e); } } private void applyAlterColumnTypeEvent(AlterColumnTypeEvent event) - throws IOException, IllegalArgumentException { - TableId tableId = event.tableId(); - Map typeMapping = event.getTypeMapping(); - - for (Map.Entry entry : typeMapping.entrySet()) { - schemaChangeManager.modifyColumnDataType( - tableId.getSchemaName(), - tableId.getTableName(), - new FieldSchema( - entry.getKey(), - buildTypeString(entry.getValue()), - null)); // Currently, AlterColumnTypeEvent carries no comment info. This - // will be fixed after FLINK-35243 got merged. + throws SchemaEvolveException { + try { + TableId tableId = event.tableId(); + Map typeMapping = event.getTypeMapping(); + + for (Map.Entry entry : typeMapping.entrySet()) { + schemaChangeManager.modifyColumnDataType( + tableId.getSchemaName(), + tableId.getTableName(), + new FieldSchema( + entry.getKey(), + buildTypeString(entry.getValue()), + null)); // Currently, AlterColumnTypeEvent carries no comment info. + // This + // will be fixed after FLINK-35243 got merged. + } + } catch (Exception e) { + throw new SchemaEvolveException(event, e.getMessage(), e); } } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java index 79583d83e0..cdb5983a5d 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java @@ -20,8 +20,10 @@ import org.apache.flink.cdc.common.event.AddColumnEvent; import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; import org.apache.flink.cdc.common.event.DropColumnEvent; +import org.apache.flink.cdc.common.event.DropTableEvent; import org.apache.flink.cdc.common.event.RenameColumnEvent; import org.apache.flink.cdc.common.event.SchemaChangeEvent; +import org.apache.flink.cdc.common.event.TruncateTableEvent; import org.apache.flink.cdc.common.types.DataType; import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser; @@ -277,6 +279,25 @@ public void exitAlterByRenameColumn(MySqlParser.AlterByRenameColumnContext ctx) super.exitAlterByRenameColumn(ctx); } + @Override + public void exitTruncateTable(MySqlParser.TruncateTableContext ctx) { + TableId tableId = parser.parseQualifiedTableId(ctx.tableName().fullId()); + changes.add(new TruncateTableEvent(toCdcTableId(tableId))); + super.exitTruncateTable(ctx); + } + + @Override + public void exitDropTable(MySqlParser.DropTableContext ctx) { + ctx.tables() + .tableName() + .forEach( + evt -> { + TableId tableId = parser.parseQualifiedTableId(evt.fullId()); + changes.add(new DropTableEvent(toCdcTableId(tableId))); + }); + super.exitDropTable(ctx); + } + private org.apache.flink.cdc.common.schema.Column toCdcColumn(Column dbzColumn) { return org.apache.flink.cdc.common.schema.Column.physicalColumn( dbzColumn.name(), diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java index bd2c059bbf..9aadcbef26 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java @@ -25,9 +25,11 @@ import org.apache.flink.cdc.common.event.CreateTableEvent; import org.apache.flink.cdc.common.event.DataChangeEvent; import org.apache.flink.cdc.common.event.DropColumnEvent; +import org.apache.flink.cdc.common.event.DropTableEvent; import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.event.RenameColumnEvent; import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.event.TruncateTableEvent; import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.source.FlinkSourceProvider; @@ -61,6 +63,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.stream.Collectors; import java.util.stream.Stream; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCHEMA_CHANGE_ENABLED; @@ -345,11 +348,144 @@ public void testParseAlterStatement() throws Exception { Collections.singletonList( new AddColumnEvent.ColumnWithPosition( Column.physicalColumn("cols9", DataTypes.CHAR(1)))))); + + // Drop orders table first to remove foreign key restraints + statement.execute( + String.format( + "DROP TABLE `%s`.`orders`;", inventoryDatabase.getDatabaseName())); + + statement.execute( + String.format( + "TRUNCATE TABLE `%s`.`products`;", + inventoryDatabase.getDatabaseName())); + expected.add(new TruncateTableEvent(tableId)); + + statement.execute( + String.format( + "DROP TABLE `%s`.`products`;", inventoryDatabase.getDatabaseName())); + expected.add(new DropTableEvent(tableId)); } List actual = fetchResults(events, expected.size()); assertThat(actual).isEqualTo(expected); } + @Test + public void testSchemaChangeEvents() throws Exception { + env.setParallelism(1); + inventoryDatabase.createAndInitialize(); + MySqlSourceConfigFactory configFactory = + new MySqlSourceConfigFactory() + .hostname(MYSQL8_CONTAINER.getHost()) + .port(MYSQL8_CONTAINER.getDatabasePort()) + .username(TEST_USER) + .password(TEST_PASSWORD) + .databaseList(inventoryDatabase.getDatabaseName()) + .tableList(inventoryDatabase.getDatabaseName() + ".*") + .startupOptions(StartupOptions.latest()) + .serverId(getServerId(env.getParallelism())) + .serverTimeZone("UTC") + .includeSchemaChanges(SCHEMA_CHANGE_ENABLED.defaultValue()); + + FlinkSourceProvider sourceProvider = + (FlinkSourceProvider) new MySqlDataSource(configFactory).getEventSourceProvider(); + CloseableIterator events = + env.fromSource( + sourceProvider.getSource(), + WatermarkStrategy.noWatermarks(), + MySqlDataSourceFactory.IDENTIFIER, + new EventTypeInfo()) + .executeAndCollect(); + Thread.sleep(5_000); + + List expected = + new ArrayList<>( + getInventoryCreateAllTableEvents(inventoryDatabase.getDatabaseName())); + + try (Connection connection = inventoryDatabase.getJdbcConnection(); + Statement statement = connection.createStatement()) { + + statement.execute( + String.format( + "ALTER TABLE `%s`.`customers` ADD COLUMN `newcol1` INT NULL;", + inventoryDatabase.getDatabaseName())); + expected.add( + new AddColumnEvent( + TableId.tableId(inventoryDatabase.getDatabaseName(), "customers"), + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn("newcol1", DataTypes.INT()))))); + + // Test MODIFY COLUMN DDL + statement.execute( + String.format( + "ALTER TABLE `%s`.`customers` MODIFY COLUMN `newcol1` DOUBLE;", + inventoryDatabase.getDatabaseName())); + + expected.add( + new AlterColumnTypeEvent( + TableId.tableId(inventoryDatabase.getDatabaseName(), "customers"), + Collections.singletonMap("newcol1", DataTypes.DOUBLE()))); + + // Test CHANGE COLUMN DDL + statement.execute( + String.format( + "ALTER TABLE `%s`.`customers` CHANGE COLUMN `newcol1` `newcol2` INT;", + inventoryDatabase.getDatabaseName())); + + expected.add( + new AlterColumnTypeEvent( + TableId.tableId(inventoryDatabase.getDatabaseName(), "customers"), + Collections.singletonMap("newcol1", DataTypes.INT()))); + + expected.add( + new RenameColumnEvent( + TableId.tableId(inventoryDatabase.getDatabaseName(), "customers"), + Collections.singletonMap("newcol1", "newcol2"))); + + statement.execute( + String.format( + "ALTER TABLE `%s`.`customers` CHANGE COLUMN `newcol2` `newcol1` DOUBLE;", + inventoryDatabase.getDatabaseName())); + + expected.add( + new AlterColumnTypeEvent( + TableId.tableId(inventoryDatabase.getDatabaseName(), "customers"), + Collections.singletonMap("newcol2", DataTypes.DOUBLE()))); + + expected.add( + new RenameColumnEvent( + TableId.tableId(inventoryDatabase.getDatabaseName(), "customers"), + Collections.singletonMap("newcol2", "newcol1"))); + + // Test truncate table DDL + statement.execute( + String.format( + "TRUNCATE TABLE `%s`.`orders`;", inventoryDatabase.getDatabaseName())); + + expected.add( + new TruncateTableEvent( + TableId.tableId(inventoryDatabase.getDatabaseName(), "orders"))); + + // Test drop table DDL + statement.execute( + String.format( + "DROP TABLE `%s`.`orders`, `%s`.`customers`;", + inventoryDatabase.getDatabaseName(), + inventoryDatabase.getDatabaseName())); + + expected.add( + new DropTableEvent( + TableId.tableId(inventoryDatabase.getDatabaseName(), "orders"))); + expected.add( + new DropTableEvent( + TableId.tableId(inventoryDatabase.getDatabaseName(), "customers"))); + } + List actual = fetchResults(events, expected.size()); + assertEqualsInAnyOrder( + expected.stream().map(Object::toString).collect(Collectors.toList()), + actual.stream().map(Object::toString).collect(Collectors.toList())); + } + private CreateTableEvent getProductsCreateTableEvent(TableId tableId) { return new CreateTableEvent( tableId, @@ -362,6 +498,46 @@ private CreateTableEvent getProductsCreateTableEvent(TableId tableId) { .build()); } + private List getInventoryCreateAllTableEvents(String databaseName) { + return Arrays.asList( + new CreateTableEvent( + TableId.tableId(databaseName, "products"), + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT().notNull()) + .physicalColumn("name", DataTypes.VARCHAR(255).notNull(), "flink") + .physicalColumn("description", DataTypes.VARCHAR(512)) + .physicalColumn("weight", DataTypes.FLOAT()) + .primaryKey(Collections.singletonList("id")) + .build()), + new CreateTableEvent( + TableId.tableId(databaseName, "customers"), + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT().notNull()) + .physicalColumn("first_name", DataTypes.VARCHAR(255).notNull()) + .physicalColumn("last_name", DataTypes.VARCHAR(255).notNull()) + .physicalColumn("email", DataTypes.VARCHAR(255).notNull()) + .primaryKey(Collections.singletonList("id")) + .build()), + new CreateTableEvent( + TableId.tableId(databaseName, "orders"), + Schema.newBuilder() + .physicalColumn("order_number", DataTypes.INT().notNull()) + .physicalColumn("order_date", DataTypes.DATE().notNull()) + .physicalColumn("purchaser", DataTypes.INT().notNull()) + .physicalColumn("quantity", DataTypes.INT().notNull()) + .physicalColumn("product_id", DataTypes.INT().notNull()) + .primaryKey(Collections.singletonList("order_number")) + .build()), + new CreateTableEvent( + TableId.tableId(databaseName, "multi_max_table"), + Schema.newBuilder() + .physicalColumn("order_id", DataTypes.VARCHAR(128).notNull()) + .physicalColumn("index", DataTypes.INT().notNull()) + .physicalColumn("desc", DataTypes.VARCHAR(512).notNull()) + .primaryKey(Arrays.asList("order_id", "index")) + .build())); + } + private List getSnapshotExpected(TableId tableId) { RowType rowType = RowType.of( diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java index 3ae3c9c4c7..5a349946b6 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java @@ -25,6 +25,7 @@ import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.common.event.SchemaChangeEventType; import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.event.visitor.SchemaChangeEventVisitor; import org.apache.flink.cdc.common.exceptions.SchemaEvolveException; import org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException; import org.apache.flink.cdc.common.schema.Schema; @@ -117,110 +118,134 @@ public void applySchemaChange(SchemaChangeEvent schemaChangeEvent) if (catalog == null) { catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions); } + SchemaChangeEventVisitor.visit( + schemaChangeEvent, + addColumnEvent -> { + applyAddColumn(addColumnEvent); + return null; + }, + alterColumnTypeEvent -> { + applyAlterColumnType(alterColumnTypeEvent); + return null; + }, + createTableEvent -> { + applyCreateTable(createTableEvent); + return null; + }, + dropColumnEvent -> { + applyDropColumn(dropColumnEvent); + return null; + }, + dropTableEvent -> { + throw new UnsupportedSchemaChangeEventException(dropTableEvent); + }, + renameColumnEvent -> { + applyRenameColumn(renameColumnEvent); + return null; + }, + truncateTableEvent -> { + throw new UnsupportedSchemaChangeEventException(truncateTableEvent); + }); + } + + private void applyCreateTable(CreateTableEvent event) throws SchemaEvolveException { try { - if (schemaChangeEvent instanceof CreateTableEvent) { - applyCreateTable((CreateTableEvent) schemaChangeEvent); - } else if (schemaChangeEvent instanceof AddColumnEvent) { - applyAddColumn((AddColumnEvent) schemaChangeEvent); - } else if (schemaChangeEvent instanceof DropColumnEvent) { - applyDropColumn((DropColumnEvent) schemaChangeEvent); - } else if (schemaChangeEvent instanceof RenameColumnEvent) { - applyRenameColumn((RenameColumnEvent) schemaChangeEvent); - } else if (schemaChangeEvent instanceof AlterColumnTypeEvent) { - applyAlterColumn((AlterColumnTypeEvent) schemaChangeEvent); - } else { - throw new UnsupportedSchemaChangeEventException(schemaChangeEvent); + if (!catalog.databaseExists(event.tableId().getSchemaName())) { + catalog.createDatabase(event.tableId().getSchemaName(), true); } - } catch (Exception e) { - throw new SchemaEvolveException(schemaChangeEvent, "schema change applying failure", e); + Schema schema = event.getSchema(); + org.apache.paimon.schema.Schema.Builder builder = + new org.apache.paimon.schema.Schema.Builder(); + schema.getColumns() + .forEach( + (column) -> + builder.column( + column.getName(), + LogicalTypeConversion.toDataType( + DataTypeUtils.toFlinkDataType(column.getType()) + .getLogicalType()))); + builder.primaryKey(schema.primaryKeys().toArray(new String[0])); + if (partitionMaps.containsKey(event.tableId())) { + builder.partitionKeys(partitionMaps.get(event.tableId())); + } else if (schema.partitionKeys() != null && !schema.partitionKeys().isEmpty()) { + builder.partitionKeys(schema.partitionKeys()); + } + builder.options(tableOptions); + builder.options(schema.options()); + catalog.createTable( + new Identifier(event.tableId().getSchemaName(), event.tableId().getTableName()), + builder.build(), + true); + } catch (Catalog.TableAlreadyExistException + | Catalog.DatabaseNotExistException + | Catalog.DatabaseAlreadyExistException e) { + throw new SchemaEvolveException(event, e.getMessage(), e); } } - private void applyCreateTable(CreateTableEvent event) - throws Catalog.DatabaseAlreadyExistException, Catalog.TableAlreadyExistException, - Catalog.DatabaseNotExistException { - if (!catalog.databaseExists(event.tableId().getSchemaName())) { - catalog.createDatabase(event.tableId().getSchemaName(), true); - } - Schema schema = event.getSchema(); - org.apache.paimon.schema.Schema.Builder builder = - new org.apache.paimon.schema.Schema.Builder(); - schema.getColumns() - .forEach( - (column) -> - builder.column( - column.getName(), - LogicalTypeConversion.toDataType( - DataTypeUtils.toFlinkDataType(column.getType()) - .getLogicalType()))); - builder.primaryKey(schema.primaryKeys().toArray(new String[0])); - if (partitionMaps.containsKey(event.tableId())) { - builder.partitionKeys(partitionMaps.get(event.tableId())); - } else if (schema.partitionKeys() != null && !schema.partitionKeys().isEmpty()) { - builder.partitionKeys(schema.partitionKeys()); + private void applyAddColumn(AddColumnEvent event) throws SchemaEvolveException { + try { + List tableChangeList = applyAddColumnEventWithPosition(event); + catalog.alterTable( + new Identifier(event.tableId().getSchemaName(), event.tableId().getTableName()), + tableChangeList, + true); + } catch (Catalog.TableNotExistException + | Catalog.ColumnAlreadyExistException + | Catalog.ColumnNotExistException e) { + throw new SchemaEvolveException(event, e.getMessage(), e); } - builder.options(tableOptions); - builder.options(schema.options()); - catalog.createTable( - new Identifier(event.tableId().getSchemaName(), event.tableId().getTableName()), - builder.build(), - true); - } - - private void applyAddColumn(AddColumnEvent event) - throws Catalog.TableNotExistException, Catalog.ColumnAlreadyExistException, - Catalog.ColumnNotExistException { - List tableChangeList = applyAddColumnEventWithPosition(event); - catalog.alterTable( - new Identifier(event.tableId().getSchemaName(), event.tableId().getTableName()), - tableChangeList, - true); } private List applyAddColumnEventWithPosition(AddColumnEvent event) - throws Catalog.TableNotExistException { - List tableChangeList = new ArrayList<>(); - for (AddColumnEvent.ColumnWithPosition columnWithPosition : event.getAddedColumns()) { - SchemaChange tableChange; - switch (columnWithPosition.getPosition()) { - case FIRST: - tableChange = - SchemaChangeProvider.add( - columnWithPosition, - SchemaChange.Move.first( - columnWithPosition.getAddColumn().getName())); - tableChangeList.add(tableChange); - break; - case LAST: - SchemaChange schemaChangeWithLastPosition = - SchemaChangeProvider.add(columnWithPosition); - tableChangeList.add(schemaChangeWithLastPosition); - break; - case BEFORE: - SchemaChange schemaChangeWithBeforePosition = - applyAddColumnWithBeforePosition( - event.tableId().getSchemaName(), - event.tableId().getTableName(), - columnWithPosition); - tableChangeList.add(schemaChangeWithBeforePosition); - break; - case AFTER: - checkNotNull( - columnWithPosition.getExistedColumnName(), - "Existing column name must be provided for AFTER position"); - SchemaChange.Move after = - SchemaChange.Move.after( - columnWithPosition.getAddColumn().getName(), - columnWithPosition.getExistedColumnName()); - tableChange = SchemaChangeProvider.add(columnWithPosition, after); - tableChangeList.add(tableChange); - break; - default: - throw new IllegalArgumentException( - "Unknown column position: " + columnWithPosition.getPosition()); + throws SchemaEvolveException { + try { + List tableChangeList = new ArrayList<>(); + for (AddColumnEvent.ColumnWithPosition columnWithPosition : event.getAddedColumns()) { + SchemaChange tableChange; + switch (columnWithPosition.getPosition()) { + case FIRST: + tableChange = + SchemaChangeProvider.add( + columnWithPosition, + SchemaChange.Move.first( + columnWithPosition.getAddColumn().getName())); + tableChangeList.add(tableChange); + break; + case LAST: + SchemaChange schemaChangeWithLastPosition = + SchemaChangeProvider.add(columnWithPosition); + tableChangeList.add(schemaChangeWithLastPosition); + break; + case BEFORE: + SchemaChange schemaChangeWithBeforePosition = + applyAddColumnWithBeforePosition( + event.tableId().getSchemaName(), + event.tableId().getTableName(), + columnWithPosition); + tableChangeList.add(schemaChangeWithBeforePosition); + break; + case AFTER: + checkNotNull( + columnWithPosition.getExistedColumnName(), + "Existing column name must be provided for AFTER position"); + SchemaChange.Move after = + SchemaChange.Move.after( + columnWithPosition.getAddColumn().getName(), + columnWithPosition.getExistedColumnName()); + tableChange = SchemaChangeProvider.add(columnWithPosition, after); + tableChangeList.add(tableChange); + break; + default: + throw new SchemaEvolveException( + event, + "Unknown column position: " + columnWithPosition.getPosition()); + } } + return tableChangeList; + } catch (Catalog.TableNotExistException e) { + throw new SchemaEvolveException(event, e.getMessage(), e); } - return tableChangeList; } private SchemaChange applyAddColumnWithBeforePosition( @@ -248,44 +273,58 @@ private int checkColumnPosition(String existedColumnName, List columnNam return index; } - private void applyDropColumn(DropColumnEvent event) - throws Catalog.ColumnAlreadyExistException, Catalog.TableNotExistException, - Catalog.ColumnNotExistException { - List tableChangeList = new ArrayList<>(); - event.getDroppedColumnNames() - .forEach((column) -> tableChangeList.add(SchemaChangeProvider.drop(column))); - catalog.alterTable( - new Identifier(event.tableId().getSchemaName(), event.tableId().getTableName()), - tableChangeList, - true); + private void applyDropColumn(DropColumnEvent event) throws SchemaEvolveException { + try { + List tableChangeList = new ArrayList<>(); + event.getDroppedColumnNames() + .forEach((column) -> tableChangeList.add(SchemaChangeProvider.drop(column))); + catalog.alterTable( + new Identifier(event.tableId().getSchemaName(), event.tableId().getTableName()), + tableChangeList, + true); + } catch (Catalog.TableNotExistException + | Catalog.ColumnAlreadyExistException + | Catalog.ColumnNotExistException e) { + throw new SchemaEvolveException(event, e.getMessage(), e); + } } - private void applyRenameColumn(RenameColumnEvent event) - throws Catalog.ColumnAlreadyExistException, Catalog.TableNotExistException, - Catalog.ColumnNotExistException { - List tableChangeList = new ArrayList<>(); - event.getNameMapping() - .forEach( - (oldName, newName) -> - tableChangeList.add(SchemaChangeProvider.rename(oldName, newName))); - catalog.alterTable( - new Identifier(event.tableId().getSchemaName(), event.tableId().getTableName()), - tableChangeList, - true); + private void applyRenameColumn(RenameColumnEvent event) throws SchemaEvolveException { + try { + List tableChangeList = new ArrayList<>(); + event.getNameMapping() + .forEach( + (oldName, newName) -> + tableChangeList.add( + SchemaChangeProvider.rename(oldName, newName))); + catalog.alterTable( + new Identifier(event.tableId().getSchemaName(), event.tableId().getTableName()), + tableChangeList, + true); + } catch (Catalog.TableNotExistException + | Catalog.ColumnAlreadyExistException + | Catalog.ColumnNotExistException e) { + throw new SchemaEvolveException(event, e.getMessage(), e); + } } - private void applyAlterColumn(AlterColumnTypeEvent event) - throws Catalog.ColumnAlreadyExistException, Catalog.TableNotExistException, - Catalog.ColumnNotExistException { - List tableChangeList = new ArrayList<>(); - event.getTypeMapping() - .forEach( - (oldName, newType) -> - tableChangeList.add( - SchemaChangeProvider.updateColumnType(oldName, newType))); - catalog.alterTable( - new Identifier(event.tableId().getSchemaName(), event.tableId().getTableName()), - tableChangeList, - true); + private void applyAlterColumnType(AlterColumnTypeEvent event) throws SchemaEvolveException { + try { + List tableChangeList = new ArrayList<>(); + event.getTypeMapping() + .forEach( + (oldName, newType) -> + tableChangeList.add( + SchemaChangeProvider.updateColumnType( + oldName, newType))); + catalog.alterTable( + new Identifier(event.tableId().getSchemaName(), event.tableId().getTableName()), + tableChangeList, + true); + } catch (Catalog.TableNotExistException + | Catalog.ColumnAlreadyExistException + | Catalog.ColumnNotExistException e) { + throw new SchemaEvolveException(event, e.getMessage(), e); + } } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java index 865613eda5..25bb2656c6 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java @@ -25,6 +25,7 @@ import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.common.event.SchemaChangeEventType; import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.event.visitor.SchemaChangeEventVisitor; import org.apache.flink.cdc.common.exceptions.SchemaEvolveException; import org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException; import org.apache.flink.cdc.common.schema.Column; @@ -97,19 +98,34 @@ public void applySchemaChange(SchemaChangeEvent schemaChangeEvent) catalog.open(); } - if (schemaChangeEvent instanceof CreateTableEvent) { - applyCreateTable((CreateTableEvent) schemaChangeEvent); - } else if (schemaChangeEvent instanceof AddColumnEvent) { - applyAddColumn((AddColumnEvent) schemaChangeEvent); - } else if (schemaChangeEvent instanceof DropColumnEvent) { - applyDropColumn((DropColumnEvent) schemaChangeEvent); - } else if (schemaChangeEvent instanceof RenameColumnEvent) { - applyRenameColumn((RenameColumnEvent) schemaChangeEvent); - } else if (schemaChangeEvent instanceof AlterColumnTypeEvent) { - applyAlterColumn((AlterColumnTypeEvent) schemaChangeEvent); - } else { - throw new UnsupportedSchemaChangeEventException(schemaChangeEvent); - } + SchemaChangeEventVisitor.visit( + schemaChangeEvent, + addColumnEvent -> { + applyAddColumn(addColumnEvent); + return null; + }, + alterColumnTypeEvent -> { + applyAlterColumnType(alterColumnTypeEvent); + return null; + }, + createTableEvent -> { + applyCreateTable(createTableEvent); + return null; + }, + dropColumnEvent -> { + applyDropColumn(dropColumnEvent); + return null; + }, + dropTableEvent -> { + throw new UnsupportedSchemaChangeEventException(dropTableEvent); + }, + renameColumnEvent -> { + applyRenameColumn(renameColumnEvent); + return null; + }, + truncateTableEvent -> { + throw new UnsupportedSchemaChangeEventException(truncateTableEvent); + }); } private void applyCreateTable(CreateTableEvent createTableEvent) throws SchemaEvolveException { @@ -289,7 +305,7 @@ private void applyRenameColumn(RenameColumnEvent renameColumnEvent) throw new UnsupportedSchemaChangeEventException(renameColumnEvent); } - private void applyAlterColumn(AlterColumnTypeEvent alterColumnTypeEvent) + private void applyAlterColumnType(AlterColumnTypeEvent alterColumnTypeEvent) throws SchemaEvolveException { // TODO There are limitations for data type conversions. We should know the data types // before and after changing so that we can make a validation. But the event only contains diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/ValuesDatabase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/ValuesDatabase.java index e19c4a844b..dfea5d0715 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/ValuesDatabase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/ValuesDatabase.java @@ -24,10 +24,12 @@ import org.apache.flink.cdc.common.event.CreateTableEvent; import org.apache.flink.cdc.common.event.DataChangeEvent; import org.apache.flink.cdc.common.event.DropColumnEvent; +import org.apache.flink.cdc.common.event.DropTableEvent; import org.apache.flink.cdc.common.event.RenameColumnEvent; import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.common.event.SchemaChangeEventType; import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.event.TruncateTableEvent; import org.apache.flink.cdc.common.exceptions.SchemaEvolveException; import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.common.schema.Schema; @@ -223,6 +225,13 @@ public static Schema getTableSchema(TableId tableId) { return builder.primaryKey(table.primaryKeys).build(); } + public static void applyTruncateTableEvent(TruncateTableEvent event) { + ValuesTable table = globalTables.get(event.tableId()); + Preconditions.checkNotNull(table, event.tableId() + " is not existed"); + table.applyTruncateTableEvent(event); + LOG.info("apply TruncateTableEvent: " + event); + } + public static void applyDataChangeEvent(DataChangeEvent event) { ValuesTable table = globalTables.get(event.tableId()); Preconditions.checkNotNull(table, event.tableId() + " is not existed"); @@ -237,6 +246,13 @@ public static void applySchemaChangeEvent(SchemaChangeEvent event) { globalTables.put( tableId, new ValuesTable(tableId, ((CreateTableEvent) event).getSchema())); } + } else if (event instanceof DropTableEvent) { + globalTables.remove(tableId); + } else if (event instanceof TruncateTableEvent) { + if (globalTables.containsKey(tableId)) { + ValuesTable table = globalTables.get(event.tableId()); + table.applyTruncateTableEvent((TruncateTableEvent) event); + } } else { ValuesTable table = globalTables.get(event.tableId()); Preconditions.checkNotNull(table, event.tableId() + " is not existed"); @@ -489,5 +505,9 @@ private void applyRenameColumnEvent(RenameColumnEvent event) { }); }); } + + private void applyTruncateTableEvent(TruncateTableEvent event) { + records.clear(); + } } } diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java index ada384e070..3ce48bce23 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java @@ -39,8 +39,6 @@ import java.sql.SQLException; import java.sql.Statement; import java.time.Duration; -import java.util.Arrays; -import java.util.List; import java.util.concurrent.TimeoutException; /** End-to-end tests for mysql cdc pipeline job. */ @@ -123,54 +121,24 @@ public void testSyncWholeDatabase() throws Exception { String.format( "DataChangeEvent{tableId=%s.products, before=[], after=[109, spare tire, 24 inch spare tire, 22.2, null, null, null], op=INSERT, meta=()}", mysqlInventoryDatabase.getDatabaseName())); - List expectedEvents = - Arrays.asList( - String.format( - "CreateTableEvent{tableId=%s.customers, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`address` VARCHAR(1024),`phone_number` VARCHAR(512)}, primaryKeys=id, options=()}", - mysqlInventoryDatabase.getDatabaseName()), - String.format( - "DataChangeEvent{tableId=%s.customers, before=[], after=[104, user_4, Shanghai, 123567891234], op=INSERT, meta=()}", - mysqlInventoryDatabase.getDatabaseName()), - String.format( - "DataChangeEvent{tableId=%s.customers, before=[], after=[103, user_3, Shanghai, 123567891234], op=INSERT, meta=()}", - mysqlInventoryDatabase.getDatabaseName()), - String.format( - "DataChangeEvent{tableId=%s.customers, before=[], after=[102, user_2, Shanghai, 123567891234], op=INSERT, meta=()}", - mysqlInventoryDatabase.getDatabaseName()), - String.format( - "DataChangeEvent{tableId=%s.customers, before=[], after=[101, user_1, Shanghai, 123567891234], op=INSERT, meta=()}", - mysqlInventoryDatabase.getDatabaseName()), - String.format( - "CreateTableEvent{tableId=%s.products, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`description` VARCHAR(512),`weight` FLOAT,`enum_c` STRING 'red',`json_c` STRING,`point_c` STRING}, primaryKeys=id, options=()}", - mysqlInventoryDatabase.getDatabaseName()), - String.format( - "DataChangeEvent{tableId=%s.products, before=[], after=[109, spare tire, 24 inch spare tire, 22.2, null, null, null], op=INSERT, meta=()}", - mysqlInventoryDatabase.getDatabaseName()), - String.format( - "DataChangeEvent{tableId=%s.products, before=[], after=[107, rocks, box of assorted rocks, 5.3, null, null, null], op=INSERT, meta=()}", - mysqlInventoryDatabase.getDatabaseName()), - String.format( - "DataChangeEvent{tableId=%s.products, before=[], after=[108, jacket, water resistent black wind breaker, 0.1, null, null, null], op=INSERT, meta=()}", - mysqlInventoryDatabase.getDatabaseName()), - String.format( - "DataChangeEvent{tableId=%s.products, before=[], after=[105, hammer, 14oz carpenter's hammer, 0.875, red, {\"k1\": \"v1\", \"k2\": \"v2\"}, {\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}", - mysqlInventoryDatabase.getDatabaseName()), - String.format( - "DataChangeEvent{tableId=%s.products, before=[], after=[106, hammer, 16oz carpenter's hammer, 1.0, null, null, null], op=INSERT, meta=()}", - mysqlInventoryDatabase.getDatabaseName()), - String.format( - "DataChangeEvent{tableId=%s.products, before=[], after=[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, red, {\"key3\": \"value3\"}, {\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}", - mysqlInventoryDatabase.getDatabaseName()), - String.format( - "DataChangeEvent{tableId=%s.products, before=[], after=[104, hammer, 12oz carpenter's hammer, 0.75, white, {\"key4\": \"value4\"}, {\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}", - mysqlInventoryDatabase.getDatabaseName()), - String.format( - "DataChangeEvent{tableId=%s.products, before=[], after=[101, scooter, Small 2-wheel scooter, 3.14, red, {\"key1\": \"value1\"}, {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}", - mysqlInventoryDatabase.getDatabaseName()), - String.format( - "DataChangeEvent{tableId=%s.products, before=[], after=[102, car battery, 12V car battery, 8.1, white, {\"key2\": \"value2\"}, {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}", - mysqlInventoryDatabase.getDatabaseName())); - validateResult(expectedEvents); + + validateResult( + "CreateTableEvent{tableId=%s.customers, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`address` VARCHAR(1024),`phone_number` VARCHAR(512)}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=%s.customers, before=[], after=[104, user_4, Shanghai, 123567891234], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.customers, before=[], after=[103, user_3, Shanghai, 123567891234], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.customers, before=[], after=[102, user_2, Shanghai, 123567891234], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.customers, before=[], after=[101, user_1, Shanghai, 123567891234], op=INSERT, meta=()}", + "CreateTableEvent{tableId=%s.products, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`description` VARCHAR(512),`weight` FLOAT,`enum_c` STRING 'red',`json_c` STRING,`point_c` STRING}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[109, spare tire, 24 inch spare tire, 22.2, null, null, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[107, rocks, box of assorted rocks, 5.3, null, null, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[108, jacket, water resistent black wind breaker, 0.1, null, null, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[105, hammer, 14oz carpenter's hammer, 0.875, red, {\"k1\": \"v1\", \"k2\": \"v2\"}, {\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[106, hammer, 16oz carpenter's hammer, 1.0, null, null, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, red, {\"key3\": \"value3\"}, {\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[104, hammer, 12oz carpenter's hammer, 0.75, white, {\"key4\": \"value4\"}, {\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[101, scooter, Small 2-wheel scooter, 3.14, red, {\"key1\": \"value1\"}, {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[102, car battery, 12V car battery, 8.1, white, {\"key2\": \"value2\"}, {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}"); + LOG.info("Begin incremental reading stage."); // generate binlogs String mysqlJdbcUrl = @@ -212,38 +180,159 @@ public void testSyncWholeDatabase() throws Exception { "DataChangeEvent{tableId=%s.products, before=[111, scooter, Big 2-wheel scooter , 5.17, null, null, null, 1], after=[], op=DELETE, meta=()}", mysqlInventoryDatabase.getDatabaseName())); - expectedEvents = - Arrays.asList( - String.format( - "DataChangeEvent{tableId=%s.products, before=[106, hammer, 16oz carpenter's hammer, 1.0, null, null, null], after=[106, hammer, 18oz carpenter hammer, 1.0, null, null, null], op=UPDATE, meta=()}", - mysqlInventoryDatabase.getDatabaseName()), - String.format( - "DataChangeEvent{tableId=%s.products, before=[107, rocks, box of assorted rocks, 5.3, null, null, null], after=[107, rocks, box of assorted rocks, 5.1, null, null, null], op=UPDATE, meta=()}", - mysqlInventoryDatabase.getDatabaseName()), - String.format( - "AddColumnEvent{tableId=%s.products, addedColumns=[ColumnWithPosition{column=`new_col` INT, position=LAST, existedColumnName=null}]}", - mysqlInventoryDatabase.getDatabaseName()), - String.format( - "DataChangeEvent{tableId=%s.products, before=[], after=[110, jacket, water resistent white wind breaker, 0.2, null, null, null, 1], op=INSERT, meta=()}", - mysqlInventoryDatabase.getDatabaseName()), - String.format( - "DataChangeEvent{tableId=%s.products, before=[], after=[111, scooter, Big 2-wheel scooter , 5.18, null, null, null, 1], op=INSERT, meta=()}", - mysqlInventoryDatabase.getDatabaseName()), - String.format( - "DataChangeEvent{tableId=%s.products, before=[110, jacket, water resistent white wind breaker, 0.2, null, null, null, 1], after=[110, jacket, new water resistent white wind breaker, 0.5, null, null, null, 1], op=UPDATE, meta=()}", - mysqlInventoryDatabase.getDatabaseName()), - String.format( - "DataChangeEvent{tableId=%s.products, before=[111, scooter, Big 2-wheel scooter , 5.18, null, null, null, 1], after=[111, scooter, Big 2-wheel scooter , 5.17, null, null, null, 1], op=UPDATE, meta=()}", - mysqlInventoryDatabase.getDatabaseName()), - String.format( - "DataChangeEvent{tableId=%s.products, before=[111, scooter, Big 2-wheel scooter , 5.17, null, null, null, 1], after=[], op=DELETE, meta=()}", - mysqlInventoryDatabase.getDatabaseName())); - validateResult(expectedEvents); + validateResult( + "DataChangeEvent{tableId=%s.products, before=[106, hammer, 16oz carpenter's hammer, 1.0, null, null, null], after=[106, hammer, 18oz carpenter hammer, 1.0, null, null, null], op=UPDATE, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[107, rocks, box of assorted rocks, 5.3, null, null, null], after=[107, rocks, box of assorted rocks, 5.1, null, null, null], op=UPDATE, meta=()}", + "AddColumnEvent{tableId=%s.products, addedColumns=[ColumnWithPosition{column=`new_col` INT, position=LAST, existedColumnName=null}]}", + "DataChangeEvent{tableId=%s.products, before=[], after=[110, jacket, water resistent white wind breaker, 0.2, null, null, null, 1], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[111, scooter, Big 2-wheel scooter , 5.18, null, null, null, 1], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[110, jacket, water resistent white wind breaker, 0.2, null, null, null, 1], after=[110, jacket, new water resistent white wind breaker, 0.5, null, null, null, 1], op=UPDATE, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[111, scooter, Big 2-wheel scooter , 5.18, null, null, null, 1], after=[111, scooter, Big 2-wheel scooter , 5.17, null, null, null, 1], op=UPDATE, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[111, scooter, Big 2-wheel scooter , 5.17, null, null, null, 1], after=[], op=DELETE, meta=()}"); + } + + @Test + public void testSchemaChangeEvents() throws Exception { + String pipelineJob = + String.format( + "source:\n" + + " type: mysql\n" + + " hostname: %s\n" + + " port: 3306\n" + + " username: %s\n" + + " password: %s\n" + + " tables: %s.\\.*\n" + + " server-id: 5400-5404\n" + + " server-time-zone: UTC\n" + + "\n" + + "sink:\n" + + " type: values\n" + + "\n" + + "pipeline:\n" + + " parallelism: 1", + INTER_CONTAINER_MYSQL_ALIAS, + MYSQL_TEST_USER, + MYSQL_TEST_PASSWORD, + mysqlInventoryDatabase.getDatabaseName()); + Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar"); + Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar"); + Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); + submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, mysqlDriverJar); + waitUntilJobRunning(Duration.ofSeconds(30)); + LOG.info("Pipeline job is running"); + waitUntilSpecificEvent( + String.format( + "DataChangeEvent{tableId=%s.customers, before=[], after=[104, user_4, Shanghai, 123567891234], op=INSERT, meta=()}", + mysqlInventoryDatabase.getDatabaseName())); + waitUntilSpecificEvent( + String.format( + "DataChangeEvent{tableId=%s.products, before=[], after=[109, spare tire, 24 inch spare tire, 22.2, null, null, null], op=INSERT, meta=()}", + mysqlInventoryDatabase.getDatabaseName())); + + validateResult( + "CreateTableEvent{tableId=%s.customers, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`address` VARCHAR(1024),`phone_number` VARCHAR(512)}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=%s.customers, before=[], after=[104, user_4, Shanghai, 123567891234], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.customers, before=[], after=[103, user_3, Shanghai, 123567891234], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.customers, before=[], after=[102, user_2, Shanghai, 123567891234], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.customers, before=[], after=[101, user_1, Shanghai, 123567891234], op=INSERT, meta=()}", + "CreateTableEvent{tableId=%s.products, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`description` VARCHAR(512),`weight` FLOAT,`enum_c` STRING 'red',`json_c` STRING,`point_c` STRING}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[109, spare tire, 24 inch spare tire, 22.2, null, null, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[107, rocks, box of assorted rocks, 5.3, null, null, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[108, jacket, water resistent black wind breaker, 0.1, null, null, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[105, hammer, 14oz carpenter's hammer, 0.875, red, {\"k1\": \"v1\", \"k2\": \"v2\"}, {\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[106, hammer, 16oz carpenter's hammer, 1.0, null, null, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, red, {\"key3\": \"value3\"}, {\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[104, hammer, 12oz carpenter's hammer, 0.75, white, {\"key4\": \"value4\"}, {\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[101, scooter, Small 2-wheel scooter, 3.14, red, {\"key1\": \"value1\"}, {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[102, car battery, 12V car battery, 8.1, white, {\"key2\": \"value2\"}, {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}"); + + LOG.info("Begin incremental reading stage."); + // generate binlogs + String mysqlJdbcUrl = + String.format( + "jdbc:mysql://%s:%s/%s", + MYSQL.getHost(), + MYSQL.getDatabasePort(), + mysqlInventoryDatabase.getDatabaseName()); + try (Connection conn = + DriverManager.getConnection( + mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD); + Statement stat = conn.createStatement()) { + stat.execute("UPDATE products SET description='18oz carpenter hammer' WHERE id=106;"); + stat.execute("UPDATE products SET weight='5.1' WHERE id=107;"); + + // Perform DDL changes after the binlog is generated + waitUntilSpecificEvent( + String.format( + "DataChangeEvent{tableId=%s.products, before=[106, hammer, 16oz carpenter's hammer, 1.0, null, null, null], after=[106, hammer, 18oz carpenter hammer, 1.0, null, null, null], op=UPDATE, meta=()}", + mysqlInventoryDatabase.getDatabaseName())); + + LOG.info("Begin schema evolution stage."); + + // Test AddColumnEvent + stat.execute("ALTER TABLE products ADD COLUMN new_col INT;"); + stat.execute( + "INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2, null, null, null, 1);"); // 110 + stat.execute( + "INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18, null, null, null, 1);"); // 111 + stat.execute( + "UPDATE products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;"); + stat.execute("UPDATE products SET weight='5.17' WHERE id=111;"); + stat.execute("DELETE FROM products WHERE id=111;"); + + // Test AlterColumnTypeEvent + stat.execute("ALTER TABLE products MODIFY COLUMN new_col BIGINT;"); + stat.execute( + "INSERT INTO products VALUES (default,'derrida','forever 21',2.1728, null, null, null, 2147483649);"); // 112 + + // Test RenameColumnEvent + stat.execute("ALTER TABLE products RENAME COLUMN new_col TO new_column;"); + stat.execute( + "INSERT INTO products VALUES (default,'dynazenon','SSSS',2.1728, null, null, null, 2147483649);"); // 113 + + // Test DropColumnEvent + stat.execute("ALTER TABLE products DROP COLUMN new_column;"); + stat.execute( + "INSERT INTO products VALUES (default,'evangelion','Eva',2.1728, null, null, null);"); // 114 + + // Test TruncateTableEvent + stat.execute("TRUNCATE TABLE products;"); + + // Test DropTableEvent. It's all over. + stat.execute("DROP TABLE products;"); + } catch (SQLException e) { + LOG.error("Update table for CDC failed.", e); + throw e; + } + + waitUntilSpecificEvent( + String.format( + "DropTableEvent{tableId=%s.products}", + mysqlInventoryDatabase.getDatabaseName())); + + validateResult( + "DataChangeEvent{tableId=%s.products, before=[106, hammer, 16oz carpenter's hammer, 1.0, null, null, null], after=[106, hammer, 18oz carpenter hammer, 1.0, null, null, null], op=UPDATE, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[107, rocks, box of assorted rocks, 5.3, null, null, null], after=[107, rocks, box of assorted rocks, 5.1, null, null, null], op=UPDATE, meta=()}", + "AddColumnEvent{tableId=%s.products, addedColumns=[ColumnWithPosition{column=`new_col` INT, position=LAST, existedColumnName=null}]}", + "DataChangeEvent{tableId=%s.products, before=[], after=[110, jacket, water resistent white wind breaker, 0.2, null, null, null, 1], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[111, scooter, Big 2-wheel scooter , 5.18, null, null, null, 1], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[110, jacket, water resistent white wind breaker, 0.2, null, null, null, 1], after=[110, jacket, new water resistent white wind breaker, 0.5, null, null, null, 1], op=UPDATE, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[111, scooter, Big 2-wheel scooter , 5.18, null, null, null, 1], after=[111, scooter, Big 2-wheel scooter , 5.17, null, null, null, 1], op=UPDATE, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[111, scooter, Big 2-wheel scooter , 5.17, null, null, null, 1], after=[], op=DELETE, meta=()}", + "AlterColumnTypeEvent{tableId=%s.products, typeMapping={new_col=BIGINT}, oldTypeMapping={new_col=INT}}", + "DataChangeEvent{tableId=%s.products, before=[], after=[112, derrida, forever 21, 2.1728, null, null, null, 2147483649], op=INSERT, meta=()}", + "RenameColumnEvent{tableId=%s.products, nameMapping={new_col=new_column}}", + "DataChangeEvent{tableId=%s.products, before=[], after=[113, dynazenon, SSSS, 2.1728, null, null, null, 2147483649], op=INSERT, meta=()}", + "DropColumnEvent{tableId=%s.products, droppedColumnNames=[new_column]}", + "DataChangeEvent{tableId=%s.products, before=[], after=[114, evangelion, Eva, 2.1728, null, null, null], op=INSERT, meta=()}", + "TruncateTableEvent{tableId=%s.products}", + "DropTableEvent{tableId=%s.products}"); } - private void validateResult(List expectedEvents) throws Exception { + private void validateResult(String... expectedEvents) throws Exception { + String dbName = mysqlInventoryDatabase.getDatabaseName(); for (String event : expectedEvents) { - waitUntilSpecificEvent(event); + waitUntilSpecificEvent(String.format(event, dbName, dbName)); } } diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java index ab7ed178e6..8b3c907b89 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java @@ -221,7 +221,7 @@ public void testDefaultRoute() throws Exception { "DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[10001, 12, Derrida], op=INSERT, meta=()}", "RenameColumnEvent{tableId=%s.TABLEBETA, nameMapping={VERSION=VERSION_EX}}", "DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[10002, 15], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId=%s.TABLEGAMMA, nameMapping={VERSION=VARCHAR(19)}}", + "AlterColumnTypeEvent{tableId=%s.TABLEGAMMA, typeMapping={VERSION=VARCHAR(19)}, oldTypeMapping={VERSION=VARCHAR(17)}}", "RenameColumnEvent{tableId=%s.TABLEGAMMA, nameMapping={VERSION=VERSION_EX}}", "DataChangeEvent{tableId=%s.TABLEGAMMA, before=[], after=[10003, Fluorite], op=INSERT, meta=()}", "DropColumnEvent{tableId=%s.TABLEDELTA, droppedColumnNames=[VERSION]}"); @@ -305,7 +305,7 @@ public void testMergeTableRoute() throws Exception { "DataChangeEvent{tableId=%s.ALL, before=[], after=[10001, 12, Derrida], op=INSERT, meta=()}", "AddColumnEvent{tableId=%s.ALL, addedColumns=[ColumnWithPosition{column=`VERSION_EX` VARCHAR(17), position=LAST, existedColumnName=null}]}", "DataChangeEvent{tableId=%s.ALL, before=[], after=[10002, null, null, 15], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId=%s.ALL, nameMapping={VERSION=STRING}}", + "AlterColumnTypeEvent{tableId=%s.ALL, typeMapping={VERSION=STRING}, oldTypeMapping={VERSION=VARCHAR(17)}}", "DataChangeEvent{tableId=%s.ALL, before=[], after=[10003, null, null, Fluorite], op=INSERT, meta=()}", "DataChangeEvent{tableId=%s.ALL, before=[], after=[10004, null, null, null], op=INSERT, meta=()}"); } @@ -398,7 +398,7 @@ public void testPartialRoute() throws Exception { "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[], after=[10001, 12, Derrida], op=INSERT, meta=()}", "AddColumnEvent{tableId=NEW_%s.ALPHABET, addedColumns=[ColumnWithPosition{column=`VERSION_EX` VARCHAR(17), position=LAST, existedColumnName=null}]}", "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[], after=[10002, null, null, 15], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId=%s.TABLEGAMMA, nameMapping={VERSION=VARCHAR(19)}}", + "AlterColumnTypeEvent{tableId=%s.TABLEGAMMA, typeMapping={VERSION=VARCHAR(19)}, oldTypeMapping={VERSION=VARCHAR(17)}}", "RenameColumnEvent{tableId=%s.TABLEGAMMA, nameMapping={VERSION=VERSION_EX}}", "DataChangeEvent{tableId=%s.TABLEGAMMA, before=[], after=[10003, Fluorite], op=INSERT, meta=()}", "DropColumnEvent{tableId=%s.TABLEDELTA, droppedColumnNames=[VERSION]}", @@ -504,7 +504,7 @@ public void testMultipleRoute() throws Exception { "AddColumnEvent{tableId=NEW_%s.BETAGAMM, addedColumns=[ColumnWithPosition{column=`VERSION_EX` VARCHAR(17), position=LAST, existedColumnName=null}]}", "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[], after=[10002, null, null, 15], op=INSERT, meta=()}", "DataChangeEvent{tableId=NEW_%s.BETAGAMM, before=[], after=[10002, null, 15], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId=NEW_%s.BETAGAMM, nameMapping={VERSION=STRING}}", + "AlterColumnTypeEvent{tableId=NEW_%s.BETAGAMM, typeMapping={VERSION=STRING}, oldTypeMapping={VERSION=VARCHAR(17)}}", "DataChangeEvent{tableId=NEW_%s.BETAGAMM, before=[], after=[10003, null, Fluorite], op=INSERT, meta=()}", "DropColumnEvent{tableId=%s.TABLEDELTA, droppedColumnNames=[VERSION]}", "DataChangeEvent{tableId=%s.TABLEDELTA, before=[], after=[10004], op=INSERT, meta=()}"); @@ -616,7 +616,7 @@ public void testOneToManyRoute() throws Exception { "DataChangeEvent{tableId=NEW_%s.TABLEC, before=[], after=[10001, 12, Derrida], op=INSERT, meta=()}", "RenameColumnEvent{tableId=%s.TABLEBETA, nameMapping={VERSION=VERSION_EX}}", "DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[10002, 15], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId=%s.TABLEGAMMA, nameMapping={VERSION=VARCHAR(19)}}", + "AlterColumnTypeEvent{tableId=%s.TABLEGAMMA, typeMapping={VERSION=VARCHAR(19)}, oldTypeMapping={VERSION=VARCHAR(17)}}", "RenameColumnEvent{tableId=%s.TABLEGAMMA, nameMapping={VERSION=VERSION_EX}}", "DataChangeEvent{tableId=%s.TABLEGAMMA, before=[], after=[10003, Fluorite], op=INSERT, meta=()}", "DropColumnEvent{tableId=%s.TABLEDELTA, droppedColumnNames=[VERSION]}", @@ -706,7 +706,7 @@ public void testMergeTableRouteWithTransform() throws Exception { "DataChangeEvent{tableId=%s.ALL, before=[], after=[10001, 12, Derrida, extras], op=INSERT, meta=()}", "AddColumnEvent{tableId=%s.ALL, addedColumns=[ColumnWithPosition{column=`VERSION_EX` VARCHAR(17), position=LAST, existedColumnName=null}]}", "DataChangeEvent{tableId=%s.ALL, before=[], after=[10002, null, extras, null, 15], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId=%s.ALL, nameMapping={VERSION=STRING}}", + "AlterColumnTypeEvent{tableId=%s.ALL, typeMapping={VERSION=STRING}, oldTypeMapping={VERSION=VARCHAR(17)}}", "DataChangeEvent{tableId=%s.ALL, before=[], after=[10003, null, extras, null, Fluorite], op=INSERT, meta=()}", "DataChangeEvent{tableId=%s.ALL, before=[], after=[10004, null, extras, null, null], op=INSERT, meta=()}"); } @@ -800,7 +800,7 @@ public void testReplacementSymbol() throws Exception { "DataChangeEvent{tableId=NEW_%s.NEW_TABLEALPHA, before=[], after=[10001, 12, Derrida], op=INSERT, meta=()}", "RenameColumnEvent{tableId=NEW_%s.NEW_TABLEBETA, nameMapping={VERSION=VERSION_EX}}", "DataChangeEvent{tableId=NEW_%s.NEW_TABLEBETA, before=[], after=[10002, 15], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId=NEW_%s.NEW_TABLEGAMMA, nameMapping={VERSION=VARCHAR(19)}}", + "AlterColumnTypeEvent{tableId=NEW_%s.NEW_TABLEGAMMA, typeMapping={VERSION=VARCHAR(19)}, oldTypeMapping={VERSION=VARCHAR(17)}", "RenameColumnEvent{tableId=NEW_%s.NEW_TABLEGAMMA, nameMapping={VERSION=VERSION_EX}}", "DataChangeEvent{tableId=NEW_%s.NEW_TABLEGAMMA, before=[], after=[10003, Fluorite], op=INSERT, meta=()}", "DropColumnEvent{tableId=NEW_%s.NEW_TABLEDELTA, droppedColumnNames=[VERSION]}", diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java index d187853daf..8225558fcc 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java @@ -96,11 +96,14 @@ public void testSchemaEvolve() throws Exception { Arrays.asList( "AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}", "DataChangeEvent{tableId=%s.members, before=[], after=[1012, Eve, 17, 0], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId=%s.members, nameMapping={age=DOUBLE}}", + "AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}}", "RenameColumnEvent{tableId=%s.members, nameMapping={age=precise_age}}", "RenameColumnEvent{tableId=%s.members, nameMapping={gender=biological_sex}}", "DropColumnEvent{tableId=%s.members, droppedColumnNames=[biological_sex]}", - "DataChangeEvent{tableId=%s.members, before=[], after=[1013, Fiona, 16.0], op=INSERT, meta=()}")); + "DataChangeEvent{tableId=%s.members, before=[], after=[1013, Fiona, 16.0], op=INSERT, meta=()}", + "TruncateTableEvent{tableId=%s.members}", + "DataChangeEvent{tableId=%s.members, before=[], after=[1014, Gem, 17.0], op=INSERT, meta=()}", + "DropTableEvent{tableId=%s.members}")); } @Test @@ -157,7 +160,8 @@ public void testSchemaIgnore() throws Exception { false, Arrays.asList( "DataChangeEvent{tableId=%s.members, before=[], after=[1012, Eve, 17], op=INSERT, meta=()}", - "DataChangeEvent{tableId=%s.members, before=[], after=[1013, Fiona, null], op=INSERT, meta=()}")); + "DataChangeEvent{tableId=%s.members, before=[], after=[1013, Fiona, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.members, before=[], after=[1014, Gem, null], op=INSERT, meta=()}")); } @Test @@ -183,10 +187,17 @@ public void testLenientSchemaEvolution() throws Exception { Arrays.asList( "AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}", "DataChangeEvent{tableId=%s.members, before=[], after=[1012, Eve, 17, 0], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId=%s.members, nameMapping={age=DOUBLE}}", + "AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}}", "AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`precise_age` DOUBLE, position=LAST, existedColumnName=null}]}", "AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`biological_sex` TINYINT, position=LAST, existedColumnName=null}]}", - "DataChangeEvent{tableId=%s.members, before=[], after=[1013, Fiona, null, null, 16.0, null], op=INSERT, meta=()}")); + "DataChangeEvent{tableId=%s.members, before=[], after=[1013, Fiona, null, null, 16.0, null], op=INSERT, meta=()}", + "TruncateTableEvent{tableId=%s.members}", + "DataChangeEvent{tableId=%s.members, before=[], after=[1014, Gem, null, null, 17.0, null], op=INSERT, meta=()}")); + + assertNotExists( + Collections.singletonList( + "Applied schema change event DropTableEvent{tableId=%s.members}"), + taskManagerConsumer); } @Test @@ -200,12 +211,15 @@ public void testFineGrainedSchemaEvolution() throws Exception { Arrays.asList( "AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}", "DataChangeEvent{tableId=%s.members, before=[], after=[1012, Eve, 17, 0], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId=%s.members, nameMapping={age=DOUBLE}}", + "AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}}", "RenameColumnEvent{tableId=%s.members, nameMapping={age=precise_age}}", "RenameColumnEvent{tableId=%s.members, nameMapping={gender=biological_sex}}", - "DataChangeEvent{tableId=%s.members, before=[], after=[1013, Fiona, 16.0, null], op=INSERT, meta=()}"), - Collections.singletonList( - "Ignored schema change DropColumnEvent{tableId=%s.members, droppedColumnNames=[biological_sex]} to table %s.members.")); + "DataChangeEvent{tableId=%s.members, before=[], after=[1013, Fiona, 16.0, null], op=INSERT, meta=()}", + "TruncateTableEvent{tableId=%s.members}", + "DataChangeEvent{tableId=%s.members, before=[], after=[1014, Gem, 17.0, null], op=INSERT, meta=()}"), + Arrays.asList( + "Ignored schema change DropColumnEvent{tableId=%s.members, droppedColumnNames=[biological_sex]} to table %s.members.", + "Ignored schema change DropTableEvent{tableId=%s.members} to table %s.members.")); } @Test @@ -337,6 +351,13 @@ private void testGenericSchemaEvolution( // triggers DropColumnEvent stmt.execute("ALTER TABLE members DROP COLUMN biological_sex"); stmt.execute("INSERT INTO members VALUES (1013, 'Fiona', 16);"); + + // triggers TruncateTableEvent + stmt.execute("TRUNCATE TABLE members;"); + stmt.execute("INSERT INTO members VALUES (1014, 'Gem', 17);"); + + // triggers DropTableEvent + stmt.execute("DROP TABLE members;"); } List expectedTmEvents = @@ -387,6 +408,15 @@ private void validateResult(List expectedEvents, ToStringConsumer consum } } + private void assertNotExists(List unexpectedEvents, ToStringConsumer consumer) { + String consumerLog = consumer.toUtf8String(); + for (String event : unexpectedEvents) { + Assert.assertFalse( + consumerLog.contains( + String.format(event, schemaEvolveDatabase.getDatabaseName()))); + } + } + private void waitUntilSpecificEvent(String event, ToStringConsumer consumer) throws Exception { boolean result = false; long endTimeout = System.currentTimeMillis() + SchemaEvolveE2eITCase.EVENT_WAITING_TIMEOUT; diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivation.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivation.java index e4b547b216..9883f3d5f5 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivation.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivation.java @@ -21,10 +21,10 @@ import org.apache.flink.cdc.common.event.AddColumnEvent; import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; import org.apache.flink.cdc.common.event.CreateTableEvent; -import org.apache.flink.cdc.common.event.DropColumnEvent; import org.apache.flink.cdc.common.event.RenameColumnEvent; import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.event.visitor.SchemaChangeEventVisitor; import org.apache.flink.cdc.common.route.RouteRule; import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.common.schema.PhysicalColumn; @@ -47,6 +47,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -111,37 +112,43 @@ public List applySchemaChange(SchemaChangeEvent schemaChangeE // multiple source mapping (merging tables) Schema derivedTableSchema = schemaManager.getLatestEvolvedSchema(derivedTable).get(); - if (schemaChangeEvent instanceof CreateTableEvent) { - events.addAll( - handleCreateTableEvent( - (CreateTableEvent) schemaChangeEvent, - derivedTableSchema, - derivedTable)); - } else if (schemaChangeEvent instanceof AddColumnEvent) { - events.addAll( - handleAddColumnEvent( - (AddColumnEvent) schemaChangeEvent, - derivedTableSchema, - derivedTable)); - } else if (schemaChangeEvent instanceof AlterColumnTypeEvent) { - events.addAll( - handleAlterColumnTypeEvent( - (AlterColumnTypeEvent) schemaChangeEvent, - derivedTableSchema, - derivedTable)); - } else if (schemaChangeEvent instanceof DropColumnEvent) { - // Do nothing: drop column event should not be sent to downstream - } else if (schemaChangeEvent instanceof RenameColumnEvent) { - events.addAll( - handleRenameColumnEvent( - (RenameColumnEvent) schemaChangeEvent, - derivedTableSchema, - derivedTable)); - } else { - throw new IllegalStateException( - String.format( - "Unrecognized SchemaChangeEvent type: %s", schemaChangeEvent)); - } + events.addAll( + Objects.requireNonNull( + SchemaChangeEventVisitor.visit( + schemaChangeEvent, + addColumnEvent -> + handleAddColumnEvent( + addColumnEvent, + derivedTableSchema, + derivedTable), + alterColumnTypeEvent -> + handleAlterColumnTypeEvent( + alterColumnTypeEvent, + derivedTableSchema, + derivedTable), + createTableEvent -> + handleCreateTableEvent( + createTableEvent, + derivedTableSchema, + derivedTable), + dropColumnEvent -> + Collections.emptyList(), // Column drop shouldn't be + // spread to route + // destination. + dropTableEvent -> + Collections.emptyList(), // Table drop shouldn't be + // spread to route + // destination. + renameColumnEvent -> + handleRenameColumnEvent( + renameColumnEvent, + derivedTableSchema, + derivedTable), + truncateTableEvent -> + Collections.emptyList() // // Table truncation + // shouldn't be spread to route + // destination. + ))); } } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java index da88753e58..c47d556f64 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java @@ -26,6 +26,7 @@ import org.apache.flink.cdc.common.event.RenameColumnEvent; import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.common.event.SchemaChangeEventType; +import org.apache.flink.cdc.common.event.SchemaChangeEventWithPreSchema; import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.exceptions.SchemaEvolveException; import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; @@ -175,14 +176,26 @@ public CompletableFuture handleSchemaChangeRequest( LOG.info( "Received schema change event request from table {}. Start to buffer requests for others.", request.getTableId().toString()); - if (request.getSchemaChangeEvent() instanceof CreateTableEvent + SchemaChangeEvent event = request.getSchemaChangeEvent(); + if (event instanceof CreateTableEvent && schemaManager.originalSchemaExists(request.getTableId())) { return CompletableFuture.completedFuture( wrap(new SchemaChangeResponse(Collections.emptyList()))); } - schemaManager.applyOriginalSchemaChange(request.getSchemaChangeEvent()); + schemaManager.applyOriginalSchemaChange(event); List derivedSchemaChangeEvents = calculateDerivedSchemaChangeEvents(request.getSchemaChangeEvent()); + derivedSchemaChangeEvents.forEach( + e -> { + if (e instanceof SchemaChangeEventWithPreSchema) { + SchemaChangeEventWithPreSchema pe = (SchemaChangeEventWithPreSchema) e; + if (!pe.hasPreSchema()) { + schemaManager + .getLatestEvolvedSchema(pe.tableId()) + .ifPresent(pe::fillPreSchema); + } + } + }); CompletableFuture response = CompletableFuture.completedFuture( wrap(new SchemaChangeResponse(derivedSchemaChangeEvents))); @@ -415,6 +428,10 @@ private List lenientizeSchemaChangeEvent(SchemaChangeEvent ev } return events; } + case DROP_TABLE: + // We don't drop any tables in Lenient mode. + LOG.info("A drop table event {} has been ignored in Lenient mode.", event); + return Collections.emptyList(); default: return Collections.singletonList(event); } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java index b8dc2448e3..65fd81c669 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java @@ -24,9 +24,11 @@ import org.apache.flink.cdc.common.data.binary.BinaryRecordData; import org.apache.flink.cdc.common.event.CreateTableEvent; import org.apache.flink.cdc.common.event.DataChangeEvent; +import org.apache.flink.cdc.common.event.DropTableEvent; import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.event.TruncateTableEvent; import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.schema.Selectors; import org.apache.flink.cdc.common.utils.SchemaUtils; @@ -223,16 +225,18 @@ public void processElement(StreamRecord element) throws Exception { if (event instanceof CreateTableEvent) { CreateTableEvent createTableEvent = (CreateTableEvent) event; preTransformProcessorMap.remove(createTableEvent.tableId()); - event = cacheCreateTable(createTableEvent); + output.collect(new StreamRecord<>(cacheCreateTable(createTableEvent))); + } else if (event instanceof DropTableEvent) { + output.collect(new StreamRecord<>(event)); + } else if (event instanceof TruncateTableEvent) { output.collect(new StreamRecord<>(event)); } else if (event instanceof SchemaChangeEvent) { SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent) event; preTransformProcessorMap.remove(schemaChangeEvent.tableId()); - event = cacheChangeSchema(schemaChangeEvent); + cacheChangeSchema(schemaChangeEvent); output.collect(new StreamRecord<>(event)); } else if (event instanceof DataChangeEvent) { - DataChangeEvent dataChangeEvent = processDataChangeEvent(((DataChangeEvent) event)); - output.collect(new StreamRecord<>(dataChangeEvent)); + output.collect(new StreamRecord<>(processDataChangeEvent(((DataChangeEvent) event)))); } } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/event/AlterColumnTypeEventSerializer.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/event/AlterColumnTypeEventSerializer.java index ab228e4d6b..978b9b87ec 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/event/AlterColumnTypeEventSerializer.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/event/AlterColumnTypeEventSerializer.java @@ -60,7 +60,9 @@ public AlterColumnTypeEvent createInstance() { @Override public AlterColumnTypeEvent copy(AlterColumnTypeEvent from) { return new AlterColumnTypeEvent( - from.tableId(), typeMapSerializer.copy(from.getTypeMapping())); + from.tableId(), + typeMapSerializer.copy(from.getTypeMapping()), + typeMapSerializer.copy(from.getOldTypeMapping())); } @Override @@ -77,12 +79,15 @@ public int getLength() { public void serialize(AlterColumnTypeEvent record, DataOutputView target) throws IOException { tableIdSerializer.serialize(record.tableId(), target); typeMapSerializer.serialize(record.getTypeMapping(), target); + typeMapSerializer.serialize(record.getOldTypeMapping(), target); } @Override public AlterColumnTypeEvent deserialize(DataInputView source) throws IOException { return new AlterColumnTypeEvent( - tableIdSerializer.deserialize(source), typeMapSerializer.deserialize(source)); + tableIdSerializer.deserialize(source), + typeMapSerializer.deserialize(source), + typeMapSerializer.deserialize(source)); } @Override diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/event/DropTableEventSerializer.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/event/DropTableEventSerializer.java new file mode 100644 index 0000000000..b5f8a0d339 --- /dev/null +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/event/DropTableEventSerializer.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.runtime.serializer.event; + +import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.cdc.common.event.DropTableEvent; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.runtime.serializer.MapSerializer; +import org.apache.flink.cdc.runtime.serializer.StringSerializer; +import org.apache.flink.cdc.runtime.serializer.TableIdSerializer; +import org.apache.flink.cdc.runtime.serializer.TypeSerializerSingleton; +import org.apache.flink.cdc.runtime.serializer.schema.DataTypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +import java.io.IOException; + +/** A {@link TypeSerializer} for {@link DropTableEvent}. */ +public class DropTableEventSerializer extends TypeSerializerSingleton { + + private static final long serialVersionUID = 1L; + + /** Sharable instance of the TableIdSerializer. */ + public static final DropTableEventSerializer INSTANCE = new DropTableEventSerializer(); + + private final TableIdSerializer tableIdSerializer = TableIdSerializer.INSTANCE; + private final MapSerializer typeMapSerializer = + new MapSerializer<>(StringSerializer.INSTANCE, new DataTypeSerializer()); + + @Override + public boolean isImmutableType() { + return false; + } + + @Override + public DropTableEvent createInstance() { + return new DropTableEvent(TableId.tableId("unknown")); + } + + @Override + public DropTableEvent copy(DropTableEvent from) { + return new DropTableEvent(from.tableId()); + } + + @Override + public DropTableEvent copy(DropTableEvent from, DropTableEvent reuse) { + return copy(from); + } + + @Override + public int getLength() { + return -1; + } + + @Override + public void serialize(DropTableEvent record, DataOutputView target) throws IOException { + tableIdSerializer.serialize(record.tableId(), target); + } + + @Override + public DropTableEvent deserialize(DataInputView source) throws IOException { + return new DropTableEvent(tableIdSerializer.deserialize(source)); + } + + @Override + public DropTableEvent deserialize(DropTableEvent reuse, DataInputView source) + throws IOException { + return deserialize(source); + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + serialize(deserialize(source), target); + } + + @Override + public TypeSerializerSnapshot snapshotConfiguration() { + return new DropTableEventSerializer.DropTableEventSerializerSnapshot(); + } + + /** Serializer configuration snapshot for compatibility and format evolution. */ + @SuppressWarnings("WeakerAccess") + public static final class DropTableEventSerializerSnapshot + extends SimpleTypeSerializerSnapshot { + + public DropTableEventSerializerSnapshot() { + super(() -> INSTANCE); + } + } +} diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/event/SchemaChangeEventSerializer.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/event/SchemaChangeEventSerializer.java index 85de2fd7e0..4f556b5cad 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/event/SchemaChangeEventSerializer.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/event/SchemaChangeEventSerializer.java @@ -20,14 +20,10 @@ import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; -import org.apache.flink.cdc.common.event.AddColumnEvent; -import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; -import org.apache.flink.cdc.common.event.CreateTableEvent; -import org.apache.flink.cdc.common.event.DropColumnEvent; -import org.apache.flink.cdc.common.event.RenameColumnEvent; import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.common.event.SchemaChangeEventType; import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.event.visitor.SchemaChangeEventVisitor; import org.apache.flink.cdc.runtime.serializer.EnumSerializer; import org.apache.flink.cdc.runtime.serializer.TypeSerializerSingleton; import org.apache.flink.core.memory.DataInputView; @@ -35,6 +31,14 @@ import java.io.IOException; +import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ADD_COLUMN; +import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ALTER_COLUMN_TYPE; +import static org.apache.flink.cdc.common.event.SchemaChangeEventType.CREATE_TABLE; +import static org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_COLUMN; +import static org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_TABLE; +import static org.apache.flink.cdc.common.event.SchemaChangeEventType.RENAME_COLUMN; +import static org.apache.flink.cdc.common.event.SchemaChangeEventType.TRUNCATE_TABLE; + /** A {@link TypeSerializer} for {@link SchemaChangeEvent}. */ public final class SchemaChangeEventSerializer extends TypeSerializerSingleton { @@ -55,32 +59,28 @@ public boolean isImmutableType() { public SchemaChangeEvent createInstance() { return new SchemaChangeEvent() { @Override - public TableId tableId() { - return TableId.tableId("unknown", "unknown", "unknown"); + public SchemaChangeEventType getType() { + return null; } @Override - public SchemaChangeEventType getType() { - return null; + public TableId tableId() { + return TableId.tableId("unknown", "unknown", "unknown"); } }; } @Override public SchemaChangeEvent copy(SchemaChangeEvent from) { - if (from instanceof AlterColumnTypeEvent) { - return AlterColumnTypeEventSerializer.INSTANCE.copy((AlterColumnTypeEvent) from); - } else if (from instanceof CreateTableEvent) { - return CreateTableEventSerializer.INSTANCE.copy((CreateTableEvent) from); - } else if (from instanceof RenameColumnEvent) { - return RenameColumnEventSerializer.INSTANCE.copy((RenameColumnEvent) from); - } else if (from instanceof AddColumnEvent) { - return AddColumnEventSerializer.INSTANCE.copy((AddColumnEvent) from); - } else if (from instanceof DropColumnEvent) { - return DropColumnEventSerializer.INSTANCE.copy((DropColumnEvent) from); - } else { - throw new IllegalArgumentException("Unknown schema change event: " + from); - } + return SchemaChangeEventVisitor.visit( + from, + AddColumnEventSerializer.INSTANCE::copy, + AlterColumnTypeEventSerializer.INSTANCE::copy, + CreateTableEventSerializer.INSTANCE::copy, + DropColumnEventSerializer.INSTANCE::copy, + DropTableEventSerializer.INSTANCE::copy, + RenameColumnEventSerializer.INSTANCE::copy, + TruncateTableEventSerializer.INSTANCE::copy); } @Override @@ -95,25 +95,44 @@ public int getLength() { @Override public void serialize(SchemaChangeEvent record, DataOutputView target) throws IOException { - if (record instanceof AlterColumnTypeEvent) { - enumSerializer.serialize(SchemaChangeEventType.ALTER_COLUMN_TYPE, target); - AlterColumnTypeEventSerializer.INSTANCE.serialize( - (AlterColumnTypeEvent) record, target); - } else if (record instanceof CreateTableEvent) { - enumSerializer.serialize(SchemaChangeEventType.CREATE_TABLE, target); - CreateTableEventSerializer.INSTANCE.serialize((CreateTableEvent) record, target); - } else if (record instanceof RenameColumnEvent) { - enumSerializer.serialize(SchemaChangeEventType.RENAME_COLUMN, target); - RenameColumnEventSerializer.INSTANCE.serialize((RenameColumnEvent) record, target); - } else if (record instanceof AddColumnEvent) { - enumSerializer.serialize(SchemaChangeEventType.ADD_COLUMN, target); - AddColumnEventSerializer.INSTANCE.serialize((AddColumnEvent) record, target); - } else if (record instanceof DropColumnEvent) { - enumSerializer.serialize(SchemaChangeEventType.DROP_COLUMN, target); - DropColumnEventSerializer.INSTANCE.serialize((DropColumnEvent) record, target); - } else { - throw new IllegalArgumentException("Unknown schema change event: " + record); - } + + SchemaChangeEventVisitor.visit( + record, + addColumnEvent -> { + enumSerializer.serialize(ADD_COLUMN, target); + AddColumnEventSerializer.INSTANCE.serialize(addColumnEvent, target); + return null; + }, + alterColumnTypeEvent -> { + enumSerializer.serialize(ALTER_COLUMN_TYPE, target); + AlterColumnTypeEventSerializer.INSTANCE.serialize(alterColumnTypeEvent, target); + return null; + }, + createTableEvent -> { + enumSerializer.serialize(CREATE_TABLE, target); + CreateTableEventSerializer.INSTANCE.serialize(createTableEvent, target); + return null; + }, + dropColumnEvent -> { + enumSerializer.serialize(DROP_COLUMN, target); + DropColumnEventSerializer.INSTANCE.serialize(dropColumnEvent, target); + return null; + }, + dropTableEvent -> { + enumSerializer.serialize(DROP_TABLE, target); + DropTableEventSerializer.INSTANCE.serialize(dropTableEvent, target); + return null; + }, + renameColumnEvent -> { + enumSerializer.serialize(RENAME_COLUMN, target); + RenameColumnEventSerializer.INSTANCE.serialize(renameColumnEvent, target); + return null; + }, + truncateTableEvent -> { + enumSerializer.serialize(TRUNCATE_TABLE, target); + TruncateTableEventSerializer.INSTANCE.serialize(truncateTableEvent, target); + return null; + }); } @Override @@ -130,6 +149,10 @@ public SchemaChangeEvent deserialize(DataInputView source) throws IOException { return RenameColumnEventSerializer.INSTANCE.deserialize(source); case ALTER_COLUMN_TYPE: return AlterColumnTypeEventSerializer.INSTANCE.deserialize(source); + case DROP_TABLE: + return DropTableEventSerializer.INSTANCE.deserialize(source); + case TRUNCATE_TABLE: + return TruncateTableEventSerializer.INSTANCE.deserialize(source); default: throw new IllegalArgumentException( "Unknown schema change event class: " + schemaChangeEventType); diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/event/TruncateTableEventSerializer.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/event/TruncateTableEventSerializer.java new file mode 100644 index 0000000000..26eb6e744b --- /dev/null +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/event/TruncateTableEventSerializer.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.runtime.serializer.event; + +import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.event.TruncateTableEvent; +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.runtime.serializer.MapSerializer; +import org.apache.flink.cdc.runtime.serializer.StringSerializer; +import org.apache.flink.cdc.runtime.serializer.TableIdSerializer; +import org.apache.flink.cdc.runtime.serializer.TypeSerializerSingleton; +import org.apache.flink.cdc.runtime.serializer.schema.DataTypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +import java.io.IOException; + +/** A {@link TypeSerializer} for {@link TruncateTableEvent}. */ +public class TruncateTableEventSerializer extends TypeSerializerSingleton { + + private static final long serialVersionUID = 1L; + + /** Sharable instance of the TableIdSerializer. */ + public static final TruncateTableEventSerializer INSTANCE = new TruncateTableEventSerializer(); + + private final TableIdSerializer tableIdSerializer = TableIdSerializer.INSTANCE; + private final MapSerializer typeMapSerializer = + new MapSerializer<>(StringSerializer.INSTANCE, new DataTypeSerializer()); + + @Override + public boolean isImmutableType() { + return false; + } + + @Override + public TruncateTableEvent createInstance() { + return new TruncateTableEvent(TableId.tableId("unknown")); + } + + @Override + public TruncateTableEvent copy(TruncateTableEvent from) { + return new TruncateTableEvent(from.tableId()); + } + + @Override + public TruncateTableEvent copy(TruncateTableEvent from, TruncateTableEvent reuse) { + return copy(from); + } + + @Override + public int getLength() { + return -1; + } + + @Override + public void serialize(TruncateTableEvent record, DataOutputView target) throws IOException { + tableIdSerializer.serialize(record.tableId(), target); + } + + @Override + public TruncateTableEvent deserialize(DataInputView source) throws IOException { + return new TruncateTableEvent(tableIdSerializer.deserialize(source)); + } + + @Override + public TruncateTableEvent deserialize(TruncateTableEvent reuse, DataInputView source) + throws IOException { + return deserialize(source); + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + serialize(deserialize(source), target); + } + + @Override + public TypeSerializerSnapshot snapshotConfiguration() { + return new TruncateTableEventSerializer.TruncateTableEventSerializerSnapshot(); + } + + /** Serializer configuration snapshot for compatibility and format evolution. */ + @SuppressWarnings("WeakerAccess") + public static final class TruncateTableEventSerializerSnapshot + extends SimpleTypeSerializerSnapshot { + + public TruncateTableEventSerializerSnapshot() { + super(() -> INSTANCE); + } + } +} diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java index bf61ad2600..325aee7a71 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java @@ -249,7 +249,9 @@ public void testEvolveSchema() throws Exception { List alterColumnTypeEvents = Arrays.asList( new AlterColumnTypeEvent( - tableId, ImmutableMap.of("score", BIGINT, "toshi", FLOAT)), + tableId, + ImmutableMap.of("score", BIGINT, "toshi", FLOAT), + ImmutableMap.of("score", INT, "toshi", SMALLINT)), DataChangeEvent.insertEvent( tableId, buildRecord( @@ -504,7 +506,9 @@ public void testTryEvolveSchema() throws Exception { List alterColumnTypeEvents = Arrays.asList( new AlterColumnTypeEvent( - tableId, ImmutableMap.of("score", BIGINT, "toshi", FLOAT)), + tableId, + ImmutableMap.of("score", BIGINT, "toshi", FLOAT), + ImmutableMap.of("score", INT, "toshi", SMALLINT)), DataChangeEvent.insertEvent( tableId, buildRecord( @@ -1828,7 +1832,9 @@ public void testLenientSchemaEvolves() throws Exception { Column.physicalColumn( "toshi", SMALLINT, null)))), new AlterColumnTypeEvent( - tableId, Collections.singletonMap("name", STRING)), + tableId, + Collections.singletonMap("name", STRING), + Collections.singletonMap("name", STRING.notNull())), DataChangeEvent.insertEvent( tableId, buildRecord( @@ -1906,7 +1912,9 @@ public void testLenientSchemaEvolves() throws Exception { List alterColumnTypeEvents = Arrays.asList( new AlterColumnTypeEvent( - tableId, ImmutableMap.of("score", BIGINT, "toshi", FLOAT)), + tableId, + ImmutableMap.of("score", BIGINT, "toshi", FLOAT), + ImmutableMap.of("score", INT, "toshi", SMALLINT)), DataChangeEvent.insertEvent( tableId, buildRecord( @@ -1923,7 +1931,9 @@ public void testLenientSchemaEvolves() throws Exception { List lenientAlterColumnTypeEvents = Arrays.asList( new AlterColumnTypeEvent( - tableId, ImmutableMap.of("score", BIGINT, "toshi", FLOAT)), + tableId, + ImmutableMap.of("score", BIGINT, "toshi", FLOAT), + ImmutableMap.of("score", INT, "toshi", SMALLINT)), DataChangeEvent.insertEvent( tableId, buildRecord( @@ -2106,7 +2116,9 @@ tableId, buildRecord(INT, 12, INT, 0, SMALLINT, (short) 11)), List lenientDropColumnEvents = Arrays.asList( new AlterColumnTypeEvent( - tableId, Collections.singletonMap("name", STRING)), + tableId, + Collections.singletonMap("name", STRING), + Collections.singletonMap("name", STRING.notNull())), DataChangeEvent.insertEvent( tableId, buildRecord( @@ -2311,7 +2323,9 @@ tableId, buildRecord(INT, 12, INT, 0, SMALLINT, (short) 11)), new AddColumnEvent.ColumnWithPosition( Column.physicalColumn("yina", INT)))), new AlterColumnTypeEvent( - tableId, Collections.singletonMap("iina", INT)), + tableId, + Collections.singletonMap("iina", INT), + Collections.singletonMap("iina", INT.notNull())), DataChangeEvent.insertEvent( tableId, buildRecord( diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/event/AlterColumnTypeEventSerializerTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/event/AlterColumnTypeEventSerializerTest.java index baddd0bf20..1c45d71771 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/event/AlterColumnTypeEventSerializerTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/event/AlterColumnTypeEventSerializerTest.java @@ -49,10 +49,17 @@ protected AlterColumnTypeEvent[] getTestData() { Map map = new HashMap<>(); map.put("col1", DataTypes.BYTES()); map.put("col2", DataTypes.TIME()); + + Map oldMap = new HashMap<>(); + oldMap.put("col1", DataTypes.TIME()); + oldMap.put("col2", DataTypes.BYTES()); return new AlterColumnTypeEvent[] { new AlterColumnTypeEvent(TableId.tableId("table"), map), new AlterColumnTypeEvent(TableId.tableId("schema", "table"), map), - new AlterColumnTypeEvent(TableId.tableId("namespace", "schema", "table"), map) + new AlterColumnTypeEvent(TableId.tableId("namespace", "schema", "table"), map), + new AlterColumnTypeEvent(TableId.tableId("table"), map, oldMap), + new AlterColumnTypeEvent(TableId.tableId("schema", "table"), map, oldMap), + new AlterColumnTypeEvent(TableId.tableId("namespace", "schema", "table"), map, oldMap) }; } } diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/event/DropTableEventSerializerTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/event/DropTableEventSerializerTest.java new file mode 100644 index 0000000000..bbeb92c413 --- /dev/null +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/event/DropTableEventSerializerTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.runtime.serializer.event; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.cdc.common.event.DropTableEvent; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.runtime.serializer.SerializerTestBase; + +/** A test for the {@link RenameTableEventSerializer}. */ +public class DropTableEventSerializerTest extends SerializerTestBase { + @Override + protected TypeSerializer createSerializer() { + return DropTableEventSerializer.INSTANCE; + } + + @Override + protected int getLength() { + return -1; + } + + @Override + protected Class getTypeClass() { + return DropTableEvent.class; + } + + @Override + protected DropTableEvent[] getTestData() { + return new DropTableEvent[] { + new DropTableEvent(TableId.tableId("table")), + new DropTableEvent(TableId.tableId("schema", "table")), + new DropTableEvent(TableId.tableId("namespace", "schema", "table")) + }; + } +} diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/event/TruncateTableEventSerializerTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/event/TruncateTableEventSerializerTest.java new file mode 100644 index 0000000000..ba2f439b19 --- /dev/null +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/event/TruncateTableEventSerializerTest.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.runtime.serializer.event; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.event.TruncateTableEvent; +import org.apache.flink.cdc.runtime.serializer.SerializerTestBase; + +/** A test for the {@link RenameTableEventSerializer}. */ +public class TruncateTableEventSerializerTest extends SerializerTestBase { + @Override + protected TypeSerializer createSerializer() { + + return TruncateTableEventSerializer.INSTANCE; + } + + @Override + protected int getLength() { + return -1; + } + + @Override + protected Class getTypeClass() { + return TruncateTableEvent.class; + } + + @Override + protected TruncateTableEvent[] getTestData() { + return new TruncateTableEvent[] { + new TruncateTableEvent(TableId.tableId("table")), + new TruncateTableEvent(TableId.tableId("schema", "table")), + new TruncateTableEvent(TableId.tableId("namespace", "schema", "table")) + }; + } +}