diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerLenientITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerLenientITCase.java new file mode 100644 index 0000000000..72caef1cca --- /dev/null +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerLenientITCase.java @@ -0,0 +1,1076 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.composer.flink; + +import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.cdc.common.data.binary.BinaryStringData; +import org.apache.flink.cdc.common.event.AddColumnEvent; +import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.event.DataChangeEvent; +import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.common.event.RenameColumnEvent; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.pipeline.PipelineOptions; +import org.apache.flink.cdc.common.schema.Column; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.DataTypes; +import org.apache.flink.cdc.composer.PipelineExecution; +import org.apache.flink.cdc.composer.definition.PipelineDef; +import org.apache.flink.cdc.composer.definition.RouteDef; +import org.apache.flink.cdc.composer.definition.SinkDef; +import org.apache.flink.cdc.composer.definition.SourceDef; +import org.apache.flink.cdc.composer.definition.TransformDef; +import org.apache.flink.cdc.connectors.values.ValuesDatabase; +import org.apache.flink.cdc.connectors.values.factory.ValuesDataFactory; +import org.apache.flink.cdc.connectors.values.sink.ValuesDataSink; +import org.apache.flink.cdc.connectors.values.sink.ValuesDataSinkOptions; +import org.apache.flink.cdc.connectors.values.source.ValuesDataSourceHelper; +import org.apache.flink.cdc.connectors.values.source.ValuesDataSourceOptions; +import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.test.junit5.MiniClusterExtension; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.testcontainers.shaded.com.google.common.collect.ImmutableMap; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.apache.flink.cdc.connectors.values.source.ValuesDataSourceHelper.TABLE_1; +import static org.apache.flink.cdc.connectors.values.source.ValuesDataSourceHelper.TABLE_2; +import static org.apache.flink.configuration.CoreOptions.ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL; +import static org.assertj.core.api.Assertions.assertThat; + +/** Integration test for {@link FlinkPipelineComposer}. */ +class FlinkPipelineComposerLenientITCase { + + private static final int MAX_PARALLELISM = 4; + + // Always use parent-first classloader for CDC classes. + // The reason is that ValuesDatabase uses static field for holding data, we need to make sure + // the class is loaded by AppClassloader so that we can verify data in the test case. + private static final org.apache.flink.configuration.Configuration MINI_CLUSTER_CONFIG = + new org.apache.flink.configuration.Configuration(); + + static { + MINI_CLUSTER_CONFIG.set( + ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL, + Collections.singletonList("org.apache.flink.cdc")); + } + + /** + * Use {@link MiniClusterExtension} to reduce the overhead of restarting the MiniCluster for + * every test case. + */ + @RegisterExtension + static final MiniClusterExtension MINI_CLUSTER_RESOURCE = + new MiniClusterExtension( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(MAX_PARALLELISM) + .setConfiguration(MINI_CLUSTER_CONFIG) + .build()); + + private final PrintStream standardOut = System.out; + private final ByteArrayOutputStream outCaptor = new ByteArrayOutputStream(); + + @BeforeEach + void init() { + // Take over STDOUT as we need to check the output of values sink + System.setOut(new PrintStream(outCaptor)); + // Initialize in-memory database + ValuesDatabase.clear(); + } + + @AfterEach + void cleanup() { + System.setOut(standardOut); + } + + @ParameterizedTest + @EnumSource + void testSingleSplitSingleTable(ValuesDataSink.SinkApi sinkApi) throws Exception { + FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); + + // Setup value source + Configuration sourceConfig = new Configuration(); + sourceConfig.set( + ValuesDataSourceOptions.EVENT_SET_ID, + ValuesDataSourceHelper.EventSetId.SINGLE_SPLIT_SINGLE_TABLE); + SourceDef sourceDef = + new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig); + + // Setup value sink + Configuration sinkConfig = new Configuration(); + sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true); + sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi); + SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig); + + // Setup pipeline + Configuration pipelineConfig = new Configuration(); + pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + + PipelineDef pipelineDef = + new PipelineDef( + sourceDef, + sinkDef, + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + pipelineConfig); + + // Execute the pipeline + PipelineExecution execution = composer.compose(pipelineDef); + execution.execute(); + + // Check result in ValuesDatabase + List results = ValuesDatabase.getResults(TABLE_1); + assertThat(results) + .contains( + "default_namespace.default_schema.table1:col1=2;col2=;col3=;newCol2=;newCol3=x", + "default_namespace.default_schema.table1:col1=3;col2=3;col3=;newCol2=;newCol3="); + + // Check the order and content of all received events + String[] outputEvents = outCaptor.toString().trim().split("\n"); + assertThat(outputEvents) + .containsExactly( + "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, 3], op=INSERT, meta=()}", + "AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST, existedColumnName=null}]}", + "AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`newCol2` STRING, position=LAST, existedColumnName=null}, ColumnWithPosition{column=`newCol3` STRING, position=LAST, existedColumnName=null}]}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, null, null, null, 1], after=[], op=DELETE, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, null, null, null, ], after=[2, null, null, null, x], op=UPDATE, meta=()}"); + } + + @ParameterizedTest + @EnumSource + void testSingleSplitMultipleTables(ValuesDataSink.SinkApi sinkApi) throws Exception { + FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); + + // Setup value source + Configuration sourceConfig = new Configuration(); + sourceConfig.set( + ValuesDataSourceOptions.EVENT_SET_ID, + ValuesDataSourceHelper.EventSetId.SINGLE_SPLIT_MULTI_TABLES); + SourceDef sourceDef = + new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig); + + // Setup value sink + Configuration sinkConfig = new Configuration(); + sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true); + sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi); + SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig); + + // Setup pipeline + Configuration pipelineConfig = new Configuration(); + pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + + PipelineDef pipelineDef = + new PipelineDef( + sourceDef, + sinkDef, + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + pipelineConfig); + + // Execute the pipeline + PipelineExecution execution = composer.compose(pipelineDef); + execution.execute(); + + // Check result in ValuesDatabase + List table1Results = ValuesDatabase.getResults(TABLE_1); + assertThat(table1Results) + .containsExactly( + "default_namespace.default_schema.table1:col1=2;col2=;col3=;newCol2=;newCol3=x", + "default_namespace.default_schema.table1:col1=3;col2=3;col3=;newCol2=;newCol3="); + List table2Results = ValuesDatabase.getResults(TABLE_2); + assertThat(table2Results) + .contains( + "default_namespace.default_schema.table2:col1=1;col2=1", + "default_namespace.default_schema.table2:col1=2;col2=2", + "default_namespace.default_schema.table2:col1=3;col2=3"); + + // Check the order and content of all received events + String[] outputEvents = outCaptor.toString().trim().split("\n"); + assertThat(outputEvents) + .containsExactly( + "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.table2, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, 3], op=INSERT, meta=()}", + "AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST, existedColumnName=null}]}", + "DataChangeEvent{tableId=default_namespace.default_schema.table2, before=[], after=[1, 1], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table2, before=[], after=[2, 2], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table2, before=[], after=[3, 3], op=INSERT, meta=()}", + "AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`newCol2` STRING, position=LAST, existedColumnName=null}, ColumnWithPosition{column=`newCol3` STRING, position=LAST, existedColumnName=null}]}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, null, null, null, 1], after=[], op=DELETE, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, null, null, null, 2], after=[2, null, null, null, x], op=UPDATE, meta=()}"); + } + + @ParameterizedTest + @EnumSource + void testMultiSplitsSingleTable(ValuesDataSink.SinkApi sinkApi) throws Exception { + FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); + + // Setup value source + Configuration sourceConfig = new Configuration(); + sourceConfig.set( + ValuesDataSourceOptions.EVENT_SET_ID, + ValuesDataSourceHelper.EventSetId.MULTI_SPLITS_SINGLE_TABLE); + SourceDef sourceDef = + new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig); + + // Setup value sink + Configuration sinkConfig = new Configuration(); + sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true); + sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi); + SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig); + + // Setup pipeline + Configuration pipelineConfig = new Configuration(); + pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, MAX_PARALLELISM); + PipelineDef pipelineDef = + new PipelineDef( + sourceDef, + sinkDef, + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + pipelineConfig); + + // Execute the pipeline + PipelineExecution execution = composer.compose(pipelineDef); + execution.execute(); + + // Check result in ValuesDatabase + List table1Results = ValuesDatabase.getResults(TABLE_1); + assertThat(table1Results) + .contains( + "default_namespace.default_schema.table1:col1=1;col2=1;col3=x", + "default_namespace.default_schema.table1:col1=3;col2=3;col3=x", + "default_namespace.default_schema.table1:col1=5;col2=5;col3="); + } + + @ParameterizedTest + @EnumSource + void testTransform(ValuesDataSink.SinkApi sinkApi) throws Exception { + FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); + + // Setup value source + Configuration sourceConfig = new Configuration(); + sourceConfig.set( + ValuesDataSourceOptions.EVENT_SET_ID, + ValuesDataSourceHelper.EventSetId.TRANSFORM_TABLE); + SourceDef sourceDef = + new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig); + + // Setup value sink + Configuration sinkConfig = new Configuration(); + sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true); + sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi); + SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig); + + // Setup transform + TransformDef transformDef = + new TransformDef( + "default_namespace.default_schema.table1", + "*,concat(col1,'0') as col12", + "col1 <> '3'", + "col1", + "col12", + "key1=value1", + ""); + + // Setup pipeline + Configuration pipelineConfig = new Configuration(); + pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + + PipelineDef pipelineDef = + new PipelineDef( + sourceDef, + sinkDef, + Collections.emptyList(), + new ArrayList<>(Arrays.asList(transformDef)), + Collections.emptyList(), + pipelineConfig); + + // Execute the pipeline + PipelineExecution execution = composer.compose(pipelineDef); + execution.execute(); + + // Check the order and content of all received events + String[] outputEvents = outCaptor.toString().trim().split("\n"); + assertThat(outputEvents) + .containsExactly( + "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING,`col12` STRING}, primaryKeys=col1, partitionKeys=col12, options=({key1=value1})}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, 10], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, 20], op=INSERT, meta=()}", + "AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST, existedColumnName=null}]}", + "AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`newCol2` STRING, position=LAST, existedColumnName=null}, ColumnWithPosition{column=`newCol3` STRING, position=LAST, existedColumnName=null}]}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, null, 10, null, null, 1], after=[], op=DELETE, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, null, 20, null, null, ], after=[2, null, 20, null, null, x], op=UPDATE, meta=()}"); + } + + @ParameterizedTest + @EnumSource + void testOpTypeMetadataColumn(ValuesDataSink.SinkApi sinkApi) throws Exception { + FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); + + // Setup value source + Configuration sourceConfig = new Configuration(); + sourceConfig.set( + ValuesDataSourceOptions.EVENT_SET_ID, + ValuesDataSourceHelper.EventSetId.TRANSFORM_TABLE); + SourceDef sourceDef = + new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig); + + // Setup value sink + Configuration sinkConfig = new Configuration(); + sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true); + sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi); + SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig); + + // Setup transform + TransformDef transformDef = + new TransformDef( + "default_namespace.default_schema.table1", + "*,concat(col1,'0') as col12,__data_event_type__ as rk", + "col1 <> '3'", + "col1", + "col12", + "key1=value1", + ""); + + // Setup pipeline + Configuration pipelineConfig = new Configuration(); + pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + + PipelineDef pipelineDef = + new PipelineDef( + sourceDef, + sinkDef, + Collections.emptyList(), + new ArrayList<>(Collections.singletonList(transformDef)), + Collections.emptyList(), + pipelineConfig); + + // Execute the pipeline + PipelineExecution execution = composer.compose(pipelineDef); + execution.execute(); + + // Check the order and content of all received events + String[] outputEvents = outCaptor.toString().trim().split("\n"); + assertThat(outputEvents) + .containsExactly( + "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING,`col12` STRING,`rk` STRING}, primaryKeys=col1, partitionKeys=col12, options=({key1=value1})}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, 10, +I], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, 20, +I], op=INSERT, meta=()}", + "AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST, existedColumnName=null}]}", + "AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`newCol2` STRING, position=LAST, existedColumnName=null}, ColumnWithPosition{column=`newCol3` STRING, position=LAST, existedColumnName=null}]}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, null, 10, -D, null, null, 1], after=[], op=DELETE, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, null, 20, -U, null, null, ], after=[2, null, 20, +U, null, null, x], op=UPDATE, meta=()}"); + } + + @ParameterizedTest + @EnumSource + void testTransformTwice(ValuesDataSink.SinkApi sinkApi) throws Exception { + FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); + + // Setup value source + Configuration sourceConfig = new Configuration(); + sourceConfig.set( + ValuesDataSourceOptions.EVENT_SET_ID, + ValuesDataSourceHelper.EventSetId.TRANSFORM_TABLE); + SourceDef sourceDef = + new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig); + + // Setup value sink + Configuration sinkConfig = new Configuration(); + sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true); + sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi); + SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig); + + // Setup transform + TransformDef transformDef1 = + new TransformDef( + "default_namespace.default_schema.table1", + "*,concat(col1,'1') as col12", + "col1 = '1' OR col1 = '999'", + "col1", + "col12", + "key1=value1", + ""); + TransformDef transformDef2 = + new TransformDef( + "default_namespace.default_schema.table1", + "*,concat(col1,'2') as col12", + "col1 = '2'", + null, + null, + null, + ""); + // Setup pipeline + Configuration pipelineConfig = new Configuration(); + pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + + PipelineDef pipelineDef = + new PipelineDef( + sourceDef, + sinkDef, + Collections.emptyList(), + new ArrayList<>(Arrays.asList(transformDef1, transformDef2)), + Collections.emptyList(), + pipelineConfig); + + // Execute the pipeline + PipelineExecution execution = composer.compose(pipelineDef); + execution.execute(); + + // Check the order and content of all received events + String[] outputEvents = outCaptor.toString().trim().split("\n"); + assertThat(outputEvents) + .containsExactly( + "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING,`col12` STRING}, primaryKeys=col1, partitionKeys=col12, options=({key1=value1})}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, 11], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, 22], op=INSERT, meta=()}", + "AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST, existedColumnName=null}]}", + "AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`newCol2` STRING, position=LAST, existedColumnName=null}, ColumnWithPosition{column=`newCol3` STRING, position=LAST, existedColumnName=null}]}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, null, 11, null, null, 1], after=[], op=DELETE, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, null, 22, null, null, ], after=[2, null, 22, null, null, x], op=UPDATE, meta=()}"); + } + + @Test + void testOneToOneRouting() throws Exception { + FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); + + // Setup value source + Configuration sourceConfig = new Configuration(); + sourceConfig.set( + ValuesDataSourceOptions.EVENT_SET_ID, + ValuesDataSourceHelper.EventSetId.SINGLE_SPLIT_MULTI_TABLES); + SourceDef sourceDef = + new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig); + + // Setup value sink + Configuration sinkConfig = new Configuration(); + sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true); + SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig); + + // Setup route + TableId routedTable1 = TableId.tableId("default_namespace", "default_schema", "routed1"); + TableId routedTable2 = TableId.tableId("default_namespace", "default_schema", "routed2"); + List routeDef = + Arrays.asList( + new RouteDef(TABLE_1.toString(), routedTable1.toString(), null, null), + new RouteDef(TABLE_2.toString(), routedTable2.toString(), null, null)); + + // Setup pipeline + Configuration pipelineConfig = new Configuration(); + pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + + PipelineDef pipelineDef = + new PipelineDef( + sourceDef, + sinkDef, + routeDef, + Collections.emptyList(), + Collections.emptyList(), + pipelineConfig); + + // Execute the pipeline + PipelineExecution execution = composer.compose(pipelineDef); + execution.execute(); + + // Check result in ValuesDatabase + List routed1Results = ValuesDatabase.getResults(routedTable1); + assertThat(routed1Results) + .contains( + "default_namespace.default_schema.routed1:col1=2;col2=;col3=;newCol2=;newCol3=x", + "default_namespace.default_schema.routed1:col1=3;col2=3;col3=;newCol2=;newCol3="); + List routed2Results = ValuesDatabase.getResults(routedTable2); + assertThat(routed2Results) + .contains( + "default_namespace.default_schema.routed2:col1=1;col2=1", + "default_namespace.default_schema.routed2:col1=2;col2=2", + "default_namespace.default_schema.routed2:col1=3;col2=3"); + + // Check the order and content of all received events + String[] outputEvents = outCaptor.toString().trim().split("\n"); + assertThat(outputEvents) + .containsExactly( + "CreateTableEvent{tableId=default_namespace.default_schema.routed1, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.routed2, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.routed1, before=[], after=[1, 1], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.routed1, before=[], after=[2, 2], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.routed1, before=[], after=[3, 3], op=INSERT, meta=()}", + "AddColumnEvent{tableId=default_namespace.default_schema.routed1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST, existedColumnName=null}]}", + "DataChangeEvent{tableId=default_namespace.default_schema.routed2, before=[], after=[1, 1], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.routed2, before=[], after=[2, 2], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.routed2, before=[], after=[3, 3], op=INSERT, meta=()}", + "AddColumnEvent{tableId=default_namespace.default_schema.routed1, addedColumns=[ColumnWithPosition{column=`newCol2` STRING, position=LAST, existedColumnName=null}, ColumnWithPosition{column=`newCol3` STRING, position=LAST, existedColumnName=null}]}", + "DataChangeEvent{tableId=default_namespace.default_schema.routed1, before=[1, null, null, null, 1], after=[], op=DELETE, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.routed1, before=[2, null, null, null, 2], after=[2, null, null, null, x], op=UPDATE, meta=()}"); + } + + @Test + void testIdenticalOneToOneRouting() throws Exception { + FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); + + // Setup value source + Configuration sourceConfig = new Configuration(); + sourceConfig.set( + ValuesDataSourceOptions.EVENT_SET_ID, + ValuesDataSourceHelper.EventSetId.SINGLE_SPLIT_MULTI_TABLES); + SourceDef sourceDef = + new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig); + + // Setup value sink + Configuration sinkConfig = new Configuration(); + sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true); + SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig); + + // Setup route + TableId routedTable1 = TABLE_1; + TableId routedTable2 = TABLE_2; + List routeDef = + Arrays.asList( + new RouteDef(TABLE_1.toString(), routedTable1.toString(), null, null), + new RouteDef(TABLE_2.toString(), routedTable2.toString(), null, null)); + + // Setup pipeline + Configuration pipelineConfig = new Configuration(); + pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + + PipelineDef pipelineDef = + new PipelineDef( + sourceDef, + sinkDef, + routeDef, + Collections.emptyList(), + Collections.emptyList(), + pipelineConfig); + + // Execute the pipeline + PipelineExecution execution = composer.compose(pipelineDef); + execution.execute(); + + // Check result in ValuesDatabase + List routed1Results = ValuesDatabase.getResults(routedTable1); + assertThat(routed1Results) + .contains( + "default_namespace.default_schema.table1:col1=2;col2=;col3=;newCol2=;newCol3=x", + "default_namespace.default_schema.table1:col1=3;col2=3;col3=;newCol2=;newCol3="); + List routed2Results = ValuesDatabase.getResults(routedTable2); + assertThat(routed2Results) + .contains( + "default_namespace.default_schema.table2:col1=1;col2=1", + "default_namespace.default_schema.table2:col1=2;col2=2", + "default_namespace.default_schema.table2:col1=3;col2=3"); + + // Check the order and content of all received events + String[] outputEvents = outCaptor.toString().trim().split("\n"); + assertThat(outputEvents) + .containsExactly( + "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.table2, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, 3], op=INSERT, meta=()}", + "AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST, existedColumnName=null}]}", + "DataChangeEvent{tableId=default_namespace.default_schema.table2, before=[], after=[1, 1], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table2, before=[], after=[2, 2], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table2, before=[], after=[3, 3], op=INSERT, meta=()}", + "AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`newCol2` STRING, position=LAST, existedColumnName=null}, ColumnWithPosition{column=`newCol3` STRING, position=LAST, existedColumnName=null}]}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, null, null, null, 1], after=[], op=DELETE, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, null, null, null, 2], after=[2, null, null, null, x], op=UPDATE, meta=()}"); + } + + @Test + void testMergingWithRoute() throws Exception { + FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); + + // Setup value source + Configuration sourceConfig = new Configuration(); + sourceConfig.set( + ValuesDataSourceOptions.EVENT_SET_ID, + ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS); + + TableId myTable1 = TableId.tableId("default_namespace", "default_schema", "mytable1"); + TableId myTable2 = TableId.tableId("default_namespace", "default_schema", "mytable2"); + Schema table1Schema = + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT()) + .physicalColumn("name", DataTypes.STRING()) + .physicalColumn("age", DataTypes.INT()) + .primaryKey("id") + .build(); + Schema table2Schema = + Schema.newBuilder() + .physicalColumn("id", DataTypes.BIGINT()) + .physicalColumn("name", DataTypes.VARCHAR(255)) + .physicalColumn("age", DataTypes.TINYINT()) + .physicalColumn("description", DataTypes.STRING()) + .primaryKey("id") + .build(); + + // Create test dataset: + // Create table 1 [id, name, age] + // Table 1: +I[1, Alice, 18] + // Table 1: +I[2, Bob, 20] + // Table 1: -U[2, Bob, 20] +U[2, Bob, 30] + // Create table 2 [id, name, age] + // Table 2: +I[3, Charlie, 15, student] + // Table 2: +I[4, Donald, 25, student] + // Table 2: -D[4, Donald, 25, student] + // Rename column for table 1: name -> last_name + // Add column for table 2: gender + // Table 1: +I[5, Eliza, 24] + // Table 2: +I[6, Frank, 30, student, male] + List events = new ArrayList<>(); + BinaryRecordDataGenerator table1dataGenerator = + new BinaryRecordDataGenerator( + table1Schema.getColumnDataTypes().toArray(new DataType[0])); + BinaryRecordDataGenerator table2dataGenerator = + new BinaryRecordDataGenerator( + table2Schema.getColumnDataTypes().toArray(new DataType[0])); + events.add(new CreateTableEvent(myTable1, table1Schema)); + events.add( + DataChangeEvent.insertEvent( + myTable1, + table1dataGenerator.generate( + new Object[] {1, BinaryStringData.fromString("Alice"), 18}))); + events.add( + DataChangeEvent.insertEvent( + myTable1, + table1dataGenerator.generate( + new Object[] {2, BinaryStringData.fromString("Bob"), 20}))); + events.add( + DataChangeEvent.updateEvent( + myTable1, + table1dataGenerator.generate( + new Object[] {2, BinaryStringData.fromString("Bob"), 20}), + table1dataGenerator.generate( + new Object[] {2, BinaryStringData.fromString("Bob"), 30}))); + events.add(new CreateTableEvent(myTable2, table2Schema)); + events.add( + DataChangeEvent.insertEvent( + myTable2, + table2dataGenerator.generate( + new Object[] { + 3L, + BinaryStringData.fromString("Charlie"), + (byte) 15, + BinaryStringData.fromString("student") + }))); + events.add( + DataChangeEvent.insertEvent( + myTable2, + table2dataGenerator.generate( + new Object[] { + 4L, + BinaryStringData.fromString("Donald"), + (byte) 25, + BinaryStringData.fromString("student") + }))); + events.add( + DataChangeEvent.deleteEvent( + myTable2, + table2dataGenerator.generate( + new Object[] { + 4L, + BinaryStringData.fromString("Donald"), + (byte) 25, + BinaryStringData.fromString("student") + }))); + events.add(new RenameColumnEvent(myTable1, ImmutableMap.of("name", "last_name"))); + events.add( + new AddColumnEvent( + myTable2, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn("gender", DataTypes.STRING()))))); + events.add( + DataChangeEvent.insertEvent( + myTable1, + table1dataGenerator.generate( + new Object[] {5, BinaryStringData.fromString("Eliza"), 24}))); + events.add( + DataChangeEvent.insertEvent( + myTable2, + new BinaryRecordDataGenerator( + new DataType[] { + DataTypes.BIGINT(), + DataTypes.VARCHAR(255), + DataTypes.TINYINT(), + DataTypes.STRING(), + DataTypes.STRING() + }) + .generate( + new Object[] { + 6L, + BinaryStringData.fromString("Frank"), + (byte) 30, + BinaryStringData.fromString("student"), + BinaryStringData.fromString("male") + }))); + + ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events)); + + SourceDef sourceDef = + new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig); + + // Setup value sink + Configuration sinkConfig = new Configuration(); + sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true); + SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig); + + // Setup route + TableId mergedTable = TableId.tableId("default_namespace", "default_schema", "merged"); + List routeDef = + Collections.singletonList( + new RouteDef( + "default_namespace.default_schema.mytable[0-9]", + mergedTable.toString(), + null, + null)); + + // Setup pipeline + Configuration pipelineConfig = new Configuration(); + pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + + PipelineDef pipelineDef = + new PipelineDef( + sourceDef, + sinkDef, + routeDef, + Collections.emptyList(), + Collections.emptyList(), + pipelineConfig); + + // Execute the pipeline + PipelineExecution execution = composer.compose(pipelineDef); + execution.execute(); + Schema mergedTableSchema = ValuesDatabase.getTableSchema(mergedTable); + assertThat(mergedTableSchema) + .isEqualTo( + Schema.newBuilder() + .physicalColumn("id", DataTypes.BIGINT()) + .physicalColumn("name", DataTypes.STRING()) + .physicalColumn("age", DataTypes.BIGINT()) + .physicalColumn("description", DataTypes.STRING()) + .physicalColumn("last_name", DataTypes.STRING()) + .physicalColumn("gender", DataTypes.STRING()) + .primaryKey("id") + .build()); + String[] outputEvents = outCaptor.toString().trim().split("\n"); + assertThat(outputEvents) + .containsExactly( + "CreateTableEvent{tableId=default_namespace.default_schema.merged, schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[1, Alice, 18], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[2, Bob, 20], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[2, Bob, 20], after=[2, Bob, 30], op=UPDATE, meta=()}", + "AddColumnEvent{tableId=default_namespace.default_schema.merged, addedColumns=[ColumnWithPosition{column=`description` STRING, position=LAST, existedColumnName=null}]}", + "AlterColumnTypeEvent{tableId=default_namespace.default_schema.merged, nameMapping={age=BIGINT, id=BIGINT}}", + "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[3, Charlie, 15, student], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[4, Donald, 25, student], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[4, Donald, 25, student], after=[], op=DELETE, meta=()}", + "AddColumnEvent{tableId=default_namespace.default_schema.merged, addedColumns=[ColumnWithPosition{column=`last_name` STRING, position=LAST, existedColumnName=null}]}", + "AddColumnEvent{tableId=default_namespace.default_schema.merged, addedColumns=[ColumnWithPosition{column=`gender` STRING, position=LAST, existedColumnName=null}]}", + "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[5, null, 24, null, Eliza, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[6, Frank, 30, student, null, male], op=INSERT, meta=()}"); + } + + @Test + void testTransformMergingWithRoute() throws Exception { + FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); + + // Setup value source + Configuration sourceConfig = new Configuration(); + sourceConfig.set( + ValuesDataSourceOptions.EVENT_SET_ID, + ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS); + + TableId myTable1 = TableId.tableId("default_namespace", "default_schema", "mytable1"); + TableId myTable2 = TableId.tableId("default_namespace", "default_schema", "mytable2"); + Schema table1Schema = + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT()) + .physicalColumn("name", DataTypes.STRING()) + .physicalColumn("age", DataTypes.INT()) + .primaryKey("id") + .build(); + Schema table2Schema = + Schema.newBuilder() + .physicalColumn("id", DataTypes.BIGINT()) + .physicalColumn("name", DataTypes.VARCHAR(255)) + .physicalColumn("age", DataTypes.TINYINT()) + .physicalColumn("description", DataTypes.STRING()) + .primaryKey("id") + .build(); + + // Create test dataset: + // Create table 1 [id, name, age] + // Table 1: +I[1, Alice, 18] + // Table 1: +I[2, Bob, 20] + // Table 1: -U[2, Bob, 20] +U[2, Bob, 30] + // Create table 2 [id, name, age, description] + // Table 2: +I[3, Charlie, 15, student] + // Table 2: +I[4, Donald, 25, student] + // Table 2: -D[4, Donald, 25, student] + // Rename column for table 1: name -> last_name + // Add column for table 2: gender + // Table 1: +I[5, Eliza, 24] + // Table 2: +I[6, Frank, 30, student, male] + List events = new ArrayList<>(); + BinaryRecordDataGenerator table1dataGenerator = + new BinaryRecordDataGenerator( + table1Schema.getColumnDataTypes().toArray(new DataType[0])); + BinaryRecordDataGenerator table2dataGenerator = + new BinaryRecordDataGenerator( + table2Schema.getColumnDataTypes().toArray(new DataType[0])); + events.add(new CreateTableEvent(myTable1, table1Schema)); + events.add( + DataChangeEvent.insertEvent( + myTable1, + table1dataGenerator.generate( + new Object[] {1, BinaryStringData.fromString("Alice"), 18}))); + events.add( + DataChangeEvent.insertEvent( + myTable1, + table1dataGenerator.generate( + new Object[] {2, BinaryStringData.fromString("Bob"), 20}))); + events.add( + DataChangeEvent.updateEvent( + myTable1, + table1dataGenerator.generate( + new Object[] {2, BinaryStringData.fromString("Bob"), 20}), + table1dataGenerator.generate( + new Object[] {2, BinaryStringData.fromString("Bob"), 30}))); + events.add(new CreateTableEvent(myTable2, table2Schema)); + events.add( + DataChangeEvent.insertEvent( + myTable2, + table2dataGenerator.generate( + new Object[] { + 3L, + BinaryStringData.fromString("Charlie"), + (byte) 15, + BinaryStringData.fromString("student") + }))); + events.add( + DataChangeEvent.insertEvent( + myTable2, + table2dataGenerator.generate( + new Object[] { + 4L, + BinaryStringData.fromString("Donald"), + (byte) 25, + BinaryStringData.fromString("student") + }))); + events.add( + DataChangeEvent.deleteEvent( + myTable2, + table2dataGenerator.generate( + new Object[] { + 4L, + BinaryStringData.fromString("Donald"), + (byte) 25, + BinaryStringData.fromString("student") + }))); + // events.add(new RenameColumnEvent(myTable1, ImmutableMap.of("name", "last_name"))); + events.add( + new AddColumnEvent( + myTable2, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn("gender", DataTypes.STRING()))))); + events.add( + DataChangeEvent.insertEvent( + myTable1, + table1dataGenerator.generate( + new Object[] {5, BinaryStringData.fromString("Eliza"), 24}))); + events.add( + DataChangeEvent.insertEvent( + myTable2, + new BinaryRecordDataGenerator( + new DataType[] { + DataTypes.BIGINT(), + DataTypes.VARCHAR(255), + DataTypes.TINYINT(), + DataTypes.STRING(), + DataTypes.STRING() + }) + .generate( + new Object[] { + 6L, + BinaryStringData.fromString("Frank"), + (byte) 30, + BinaryStringData.fromString("student"), + BinaryStringData.fromString("male") + }))); + + ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events)); + + SourceDef sourceDef = + new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig); + + // Setup value sink + Configuration sinkConfig = new Configuration(); + sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true); + SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig); + + // Setup transform + List transformDef = + Collections.singletonList( + new TransformDef( + "default_namespace.default_schema.mytable[0-9]", + "*,'last_name' as last_name", + null, + null, + null, + null, + "")); + + // Setup route + TableId mergedTable = TableId.tableId("default_namespace", "default_schema", "merged"); + List routeDef = + Collections.singletonList( + new RouteDef( + "default_namespace.default_schema.mytable[0-9]", + mergedTable.toString(), + null, + null)); + + // Setup pipeline + Configuration pipelineConfig = new Configuration(); + pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + + PipelineDef pipelineDef = + new PipelineDef( + sourceDef, + sinkDef, + routeDef, + transformDef, + Collections.emptyList(), + pipelineConfig); + + // Execute the pipeline + PipelineExecution execution = composer.compose(pipelineDef); + execution.execute(); + Schema mergedTableSchema = ValuesDatabase.getTableSchema(mergedTable); + assertThat(mergedTableSchema) + .isEqualTo( + Schema.newBuilder() + .physicalColumn("id", DataTypes.BIGINT()) + .physicalColumn("name", DataTypes.STRING()) + .physicalColumn("age", DataTypes.BIGINT()) + .physicalColumn("last_name", DataTypes.STRING()) + .physicalColumn("description", DataTypes.STRING()) + .physicalColumn("gender", DataTypes.STRING()) + .primaryKey("id") + .build()); + String[] outputEvents = outCaptor.toString().trim().split("\n"); + assertThat(outputEvents) + .containsExactly( + "CreateTableEvent{tableId=default_namespace.default_schema.merged, schema=columns={`id` INT,`name` STRING,`age` INT,`last_name` STRING}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[1, Alice, 18, last_name], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[2, Bob, 20, last_name], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[2, Bob, 20, last_name], after=[2, Bob, 30, last_name], op=UPDATE, meta=()}", + "AddColumnEvent{tableId=default_namespace.default_schema.merged, addedColumns=[ColumnWithPosition{column=`description` STRING, position=LAST, existedColumnName=null}]}", + "AlterColumnTypeEvent{tableId=default_namespace.default_schema.merged, nameMapping={age=BIGINT, id=BIGINT}}", + "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[3, Charlie, 15, last_name, student], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[4, Donald, 25, last_name, student], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[4, Donald, 25, last_name, student], after=[], op=DELETE, meta=()}", + "AddColumnEvent{tableId=default_namespace.default_schema.merged, addedColumns=[ColumnWithPosition{column=`gender` STRING, position=LAST, existedColumnName=null}]}", + "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[5, Eliza, 24, last_name, null, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[6, Frank, 30, last_name, student, male], op=INSERT, meta=()}"); + } + + @ParameterizedTest + @EnumSource + void testRouteWithReplaceSymbol(ValuesDataSink.SinkApi sinkApi) throws Exception { + FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); + + // Setup value source + Configuration sourceConfig = new Configuration(); + sourceConfig.set( + ValuesDataSourceOptions.EVENT_SET_ID, + ValuesDataSourceHelper.EventSetId.SINGLE_SPLIT_MULTI_TABLES); + SourceDef sourceDef = + new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig); + + // Setup value sink + Configuration sinkConfig = new Configuration(); + sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true); + sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi); + SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig); + + // Setup pipeline + Configuration pipelineConfig = new Configuration(); + pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + + PipelineDef pipelineDef = + new PipelineDef( + sourceDef, + sinkDef, + Collections.singletonList( + new RouteDef( + "default_namespace.default_schema.table[0-9]", + "replaced_namespace.replaced_schema.__$__", + "__$__", + null)), + Collections.emptyList(), + Collections.emptyList(), + pipelineConfig); + + // Execute the pipeline + PipelineExecution execution = composer.compose(pipelineDef); + execution.execute(); + + // Check the order and content of all received events + String[] outputEvents = outCaptor.toString().trim().split("\n"); + assertThat(outputEvents) + .containsExactly( + "CreateTableEvent{tableId=replaced_namespace.replaced_schema.table1, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}", + "CreateTableEvent{tableId=replaced_namespace.replaced_schema.table2, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}", + "DataChangeEvent{tableId=replaced_namespace.replaced_schema.table1, before=[], after=[1, 1], op=INSERT, meta=()}", + "DataChangeEvent{tableId=replaced_namespace.replaced_schema.table1, before=[], after=[2, 2], op=INSERT, meta=()}", + "DataChangeEvent{tableId=replaced_namespace.replaced_schema.table1, before=[], after=[3, 3], op=INSERT, meta=()}", + "AddColumnEvent{tableId=replaced_namespace.replaced_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST, existedColumnName=null}]}", + "DataChangeEvent{tableId=replaced_namespace.replaced_schema.table2, before=[], after=[1, 1], op=INSERT, meta=()}", + "DataChangeEvent{tableId=replaced_namespace.replaced_schema.table2, before=[], after=[2, 2], op=INSERT, meta=()}", + "DataChangeEvent{tableId=replaced_namespace.replaced_schema.table2, before=[], after=[3, 3], op=INSERT, meta=()}", + "AddColumnEvent{tableId=replaced_namespace.replaced_schema.table1, addedColumns=[ColumnWithPosition{column=`newCol2` STRING, position=LAST, existedColumnName=null}, ColumnWithPosition{column=`newCol3` STRING, position=LAST, existedColumnName=null}]}", + "DataChangeEvent{tableId=replaced_namespace.replaced_schema.table1, before=[1, null, null, null, 1], after=[], op=DELETE, meta=()}", + "DataChangeEvent{tableId=replaced_namespace.replaced_schema.table1, before=[2, null, null, null, 2], after=[2, null, null, null, x], op=UPDATE, meta=()}"); + } +} diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java index 64e3e9c573..63ab22f4e7 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java @@ -207,6 +207,91 @@ public void testFineGrainedSchemaEvolution() throws Exception { "Ignored schema change DropColumnEvent{tableId=%s.members, droppedColumnNames=[biological_sex]} to table %s.members.")); } + @Test + public void testLenientWithRoute() throws Exception { + String dbName = schemaEvolveDatabase.getDatabaseName(); + + String pipelineJob = + String.format( + "source:\n" + + " type: mysql\n" + + " hostname: %s\n" + + " port: 3306\n" + + " username: %s\n" + + " password: %s\n" + + " tables: %s.members\n" + + " server-id: 5400-5404\n" + + " server-time-zone: UTC\n" + + "\n" + + "route:\n" + + " - source-table: %s.members\n" + + " sink-table: %s.redirect\n" + + "sink:\n" + + " type: values\n" + + "\n" + + "pipeline:\n" + + " schema.change.behavior: lenient\n" + + " parallelism: %d", + INTER_CONTAINER_MYSQL_ALIAS, + MYSQL_TEST_USER, + MYSQL_TEST_PASSWORD, + dbName, + dbName, + dbName, + parallelism); + Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar"); + Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar"); + Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); + submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, mysqlDriverJar); + waitUntilJobRunning(Duration.ofSeconds(30)); + LOG.info("Pipeline job is running"); + validateSnapshotData(dbName, "redirect"); + + LOG.info("Starting schema evolution"); + String mysqlJdbcUrl = + String.format( + "jdbc:mysql://%s:%s/%s", MYSQL.getHost(), MYSQL.getDatabasePort(), dbName); + + try (Connection conn = + DriverManager.getConnection( + mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD); + Statement stmt = conn.createStatement()) { + + waitForIncrementalStage(dbName, "redirect", stmt); + + // triggers AddColumnEvent + stmt.execute("ALTER TABLE members ADD COLUMN gender TINYINT AFTER age;"); + stmt.execute("INSERT INTO members VALUES (1012, 'Eve', 17, 0);"); + + // triggers AlterColumnTypeEvent and RenameColumnEvent + stmt.execute("ALTER TABLE members CHANGE COLUMN age precise_age DOUBLE;"); + + // triggers RenameColumnEvent + stmt.execute("ALTER TABLE members RENAME COLUMN gender TO biological_sex;"); + + // triggers DropColumnEvent + stmt.execute("ALTER TABLE members DROP COLUMN biological_sex"); + stmt.execute("INSERT INTO members VALUES (1013, 'Fiona', 16);"); + stmt.execute("INSERT INTO members VALUES (1014, 'Gem', 17);"); + } + + List expectedTaskManagerEvents = + Arrays.asList( + "AddColumnEvent{tableId=%s.redirect, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=LAST, existedColumnName=null}]}", + "DataChangeEvent{tableId=%s.redirect, before=[], after=[1012, Eve, 17, 0], op=INSERT, meta=()}", + "AlterColumnTypeEvent{tableId=%s.redirect, nameMapping={age=DOUBLE}}", + "AddColumnEvent{tableId=%s.redirect, addedColumns=[ColumnWithPosition{column=`precise_age` DOUBLE, position=LAST, existedColumnName=null}]}", + "AddColumnEvent{tableId=%s.redirect, addedColumns=[ColumnWithPosition{column=`biological_sex` TINYINT, position=LAST, existedColumnName=null}]}", + "DataChangeEvent{tableId=%s.redirect, before=[], after=[1013, Fiona, null, null, 16.0, null], op=INSERT, meta=()}"); + + List expectedTmEvents = + expectedTaskManagerEvents.stream() + .map(s -> String.format(s, dbName, dbName)) + .collect(Collectors.toList()); + + validateResult(expectedTmEvents, taskManagerConsumer); + } + @Test public void testUnexpectedBehavior() { String pipelineJob = diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java index 9cb0e3e007..d24ca7261d 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java @@ -337,8 +337,8 @@ public void close() throws IOException { private List calculateDerivedSchemaChangeEvents(SchemaChangeEvent event) { if (SchemaChangeBehavior.LENIENT.equals(schemaChangeBehavior)) { - return lenientizeSchemaChangeEvent(event).stream() - .flatMap(evt -> schemaDerivation.applySchemaChange(evt).stream()) + return schemaDerivation.applySchemaChange(event).stream() + .flatMap(evt -> lenientizeSchemaChangeEvent(evt).stream()) .collect(Collectors.toList()); } else { return schemaDerivation.applySchemaChange(event);