diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/dialect/DataSourceDialect.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/dialect/DataSourceDialect.java index 23e775bccc..dd3713a413 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/dialect/DataSourceDialect.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/dialect/DataSourceDialect.java @@ -18,6 +18,7 @@ import org.apache.flink.annotation.Experimental; +import com.ververica.cdc.connectors.base.config.JdbcSourceConfig; import com.ververica.cdc.connectors.base.config.SourceConfig; import com.ververica.cdc.connectors.base.source.assigner.splitter.ChunkSplitter; import com.ververica.cdc.connectors.base.source.meta.offset.Offset; @@ -69,5 +70,6 @@ public interface DataSourceDialect createFetchTask(SourceSplitBase sourceSplitBase); /** The task context used for fetch task to fetch data from external systems. */ - FetchTask.Context createFetchTaskContext(SourceSplitBase sourceSplitBase); + FetchTask.Context createFetchTaskContext( + SourceSplitBase sourceSplitBase, JdbcSourceConfig taskSourceConfig); } diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/dialect/JdbcDataSourceDialect.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/dialect/JdbcDataSourceDialect.java index 5aaa236156..3f8f29ce97 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/dialect/JdbcDataSourceDialect.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/dialect/JdbcDataSourceDialect.java @@ -75,5 +75,6 @@ default JdbcConnection openJdbcConnection(JdbcSourceConfig sourceConfig) { FetchTask createFetchTask(SourceSplitBase sourceSplitBase); @Override - JdbcSourceFetchTaskContext createFetchTaskContext(SourceSplitBase sourceSplitBase); + JdbcSourceFetchTaskContext createFetchTaskContext( + SourceSplitBase sourceSplitBase, JdbcSourceConfig taskSourceConfig); } diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/JdbcIncrementalSource.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/JdbcIncrementalSource.java index e8f8855d86..e45e7a927c 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/JdbcIncrementalSource.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/JdbcIncrementalSource.java @@ -109,7 +109,7 @@ public SourceReader createReader(SourceReaderContext readerContext) { Supplier splitReaderSupplier = () -> new JdbcSourceSplitReader( - readerContext.getIndexOfSubtask(), dataSourceDialect); + readerContext.getIndexOfSubtask(), dataSourceDialect, sourceConfig); return new JdbcIncrementalSourceReader<>( elementsQueue, splitReaderSupplier, diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/JdbcSourceSplitReader.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/JdbcSourceSplitReader.java index c966ced831..f3c50116ab 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/JdbcSourceSplitReader.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/JdbcSourceSplitReader.java @@ -22,6 +22,7 @@ import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition; import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; +import com.ververica.cdc.connectors.base.config.JdbcSourceConfig; import com.ververica.cdc.connectors.base.dialect.JdbcDataSourceDialect; import com.ververica.cdc.connectors.base.source.meta.split.ChangeEventRecords; import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase; @@ -51,11 +52,14 @@ public class JdbcSourceSplitReader implements SplitReader currentFetcher; @Nullable private String currentSplitId; private final JdbcDataSourceDialect dataSourceDialect; + private final JdbcSourceConfig sourceConfig; - public JdbcSourceSplitReader(int subtaskId, JdbcDataSourceDialect dataSourceDialect) { + public JdbcSourceSplitReader( + int subtaskId, JdbcDataSourceDialect dataSourceDialect, JdbcSourceConfig sourceConfig) { this.subtaskId = subtaskId; this.splits = new ArrayDeque<>(); this.dataSourceDialect = dataSourceDialect; + this.sourceConfig = sourceConfig; } @Override @@ -114,7 +118,7 @@ protected void checkSplitOrStartNext() throws IOException { if (nextSplit.isSnapshotSplit()) { if (currentFetcher == null) { final JdbcSourceFetchTaskContext taskContext = - dataSourceDialect.createFetchTaskContext(nextSplit); + dataSourceDialect.createFetchTaskContext(nextSplit, sourceConfig); currentFetcher = new JdbcSourceScanFetcher(taskContext, subtaskId); } } else { @@ -124,7 +128,7 @@ protected void checkSplitOrStartNext() throws IOException { currentFetcher.close(); } final JdbcSourceFetchTaskContext taskContext = - dataSourceDialect.createFetchTaskContext(nextSplit); + dataSourceDialect.createFetchTaskContext(nextSplit, sourceConfig); currentFetcher = new JdbcSourceStreamFetcher(taskContext, subtaskId); LOG.info("Stream fetcher is created."); } diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/utils/SourceRecordUtils.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/utils/SourceRecordUtils.java index e144b7b422..524a3cd516 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/utils/SourceRecordUtils.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/utils/SourceRecordUtils.java @@ -46,6 +46,7 @@ import static com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher.SIGNAL_EVENT_VALUE_SCHEMA_NAME; import static com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher.WATERMARK_KIND; import static io.debezium.connector.AbstractSourceInfo.DATABASE_NAME_KEY; +import static io.debezium.connector.AbstractSourceInfo.SCHEMA_NAME_KEY; import static io.debezium.connector.AbstractSourceInfo.TABLE_NAME_KEY; import static org.apache.flink.util.Preconditions.checkState; @@ -137,8 +138,17 @@ public static TableId getTableId(SourceRecord dataRecord) { Struct value = (Struct) dataRecord.value(); Struct source = value.getStruct(Envelope.FieldName.SOURCE); String dbName = source.getString(DATABASE_NAME_KEY); + // Oracle need schemaName + String schemaName = getSchemaName(source); String tableName = source.getString(TABLE_NAME_KEY); - return new TableId(dbName, null, tableName); + return new TableId(dbName, schemaName, tableName); + } + + public static String getSchemaName(Struct source) { + if (source.schema().fields().stream().anyMatch(r -> SCHEMA_NAME_KEY.equals(r.name()))) { + return source.getString(SCHEMA_NAME_KEY); + } + return null; } public static Object[] getSplitKey( diff --git a/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/MySqlChangeEventSourceExampleTest.java b/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/MySqlChangeEventSourceExampleTest.java index 5900ca9553..a3c56b131d 100644 --- a/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/MySqlChangeEventSourceExampleTest.java +++ b/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/MySqlChangeEventSourceExampleTest.java @@ -20,14 +20,27 @@ import org.apache.flink.runtime.minicluster.RpcServiceSharing; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.conversion.RowRowConverter; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.utils.TypeConversions; import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.util.CloseableIterator; import com.ververica.cdc.connectors.base.experimental.MySqlSourceBuilder; +import com.ververica.cdc.connectors.base.experimental.utils.MySqlConnectionUtils; import com.ververica.cdc.connectors.base.source.JdbcIncrementalSource; import com.ververica.cdc.connectors.base.testutils.MySqlContainer; import com.ververica.cdc.connectors.base.testutils.MySqlVersion; import com.ververica.cdc.connectors.base.testutils.UniqueDatabase; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; +import com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema; +import io.debezium.connector.mysql.MySqlConnection; +import io.debezium.jdbc.JdbcConnection; import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Rule; @@ -37,8 +50,20 @@ import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.lifecycle.Startables; +import java.sql.SQLException; +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + /** Example Tests for {@link JdbcIncrementalSource}. */ public class MySqlChangeEventSourceExampleTest { @@ -70,7 +95,7 @@ public static void startContainers() { @Test @Ignore("Test ignored because it won't stop and is used for manual test") - public void testConsumingAllEvents() throws Exception { + public void testConsumingScanEvents() throws Exception { inventoryDatabase.createAndInitialize(); JdbcIncrementalSource mySqlChangeEventSource = new MySqlSourceBuilder() @@ -100,6 +125,154 @@ public void testConsumingAllEvents() throws Exception { env.execute("Print MySQL Snapshot + Binlog"); } + @Test + @Ignore("Test ignored because it won't stop and is used for manual test") + public void testConsumingAllEvents() throws Exception { + final DataType dataType = + DataTypes.ROW( + DataTypes.FIELD("id", DataTypes.BIGINT()), + DataTypes.FIELD("name", DataTypes.STRING()), + DataTypes.FIELD("description", DataTypes.STRING()), + DataTypes.FIELD("weight", DataTypes.FLOAT())); + + inventoryDatabase.createAndInitialize(); + final String tableId = inventoryDatabase.getDatabaseName() + ".products"; + JdbcIncrementalSource mySqlChangeEventSource = + new MySqlSourceBuilder() + .hostname(MYSQL_CONTAINER.getHost()) + .port(MYSQL_CONTAINER.getDatabasePort()) + .databaseList(inventoryDatabase.getDatabaseName()) + .tableList(tableId) + .username(inventoryDatabase.getUsername()) + .password(inventoryDatabase.getPassword()) + .serverId("5401-5404") + .deserializer(buildRowDataDebeziumDeserializeSchema(dataType)) + .includeSchemaChanges(true) // output the schema changes as well + .splitSize(2) + .build(); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + // enable checkpoint + env.enableCheckpointing(3000); + // set the source parallelism to 4 + CloseableIterator iterator = + env.fromSource( + mySqlChangeEventSource, + WatermarkStrategy.noWatermarks(), + "MySqlParallelSource") + .setParallelism(4) + .executeAndCollect(); // collect record + + String[] snapshotExpectedRecords = + new String[] { + "+I[101, scooter, Small 2-wheel scooter, 3.14]", + "+I[102, car battery, 12V car battery, 8.1]", + "+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8]", + "+I[104, hammer, 12oz carpenter's hammer, 0.75]", + "+I[105, hammer, 14oz carpenter's hammer, 0.875]", + "+I[106, hammer, 16oz carpenter's hammer, 1.0]", + "+I[107, rocks, box of assorted rocks, 5.3]", + "+I[108, jacket, water resistent black wind breaker, 0.1]", + "+I[109, spare tire, 24 inch spare tire, 22.2]" + }; + + // step-1: consume snapshot data + List snapshotRowDataList = new ArrayList(); + for (int i = 0; i < snapshotExpectedRecords.length && iterator.hasNext(); i++) { + snapshotRowDataList.add(iterator.next()); + } + + List snapshotActualRecords = formatResult(snapshotRowDataList, dataType); + assertEqualsInAnyOrder(Arrays.asList(snapshotExpectedRecords), snapshotActualRecords); + + // step-2: make 6 change events in one MySQL transaction + makeBinlogEvents(getConnection(), tableId); + + String[] binlogExpectedRecords = + new String[] { + "-U[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8]", + "+U[103, cart, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8]", + "+I[110, spare tire, 28 inch spare tire, 26.2]", + "-D[110, spare tire, 28 inch spare tire, 26.2]", + "-U[103, cart, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8]", + "+U[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8]" + }; + + // step-3: consume binlog change events + List binlogRowDataList = new ArrayList(); + for (int i = 0; i < binlogExpectedRecords.length && iterator.hasNext(); i++) { + binlogRowDataList.add(iterator.next()); + } + List binlogActualRecords = formatResult(binlogRowDataList, dataType); + assertEqualsInAnyOrder(Arrays.asList(binlogExpectedRecords), binlogActualRecords); + + // stop the worker + iterator.close(); + } + + private RowDataDebeziumDeserializeSchema buildRowDataDebeziumDeserializeSchema( + DataType dataType) { + LogicalType logicalType = TypeConversions.fromDataToLogicalType(dataType); + InternalTypeInfo typeInfo = InternalTypeInfo.of(logicalType); + return RowDataDebeziumDeserializeSchema.newBuilder() + .setPhysicalRowType((RowType) dataType.getLogicalType()) + .setResultTypeInfo(typeInfo) + .build(); + } + + private List formatResult(List records, DataType dataType) { + RowRowConverter rowRowConverter = RowRowConverter.create(dataType); + rowRowConverter.open(Thread.currentThread().getContextClassLoader()); + return records.stream() + .map(rowRowConverter::toExternal) + .map(Object::toString) + .collect(Collectors.toList()); + } + + private MySqlConnection getConnection() { + Map properties = new HashMap<>(); + properties.put("database.hostname", MYSQL_CONTAINER.getHost()); + properties.put("database.port", String.valueOf(MYSQL_CONTAINER.getDatabasePort())); + properties.put("database.user", inventoryDatabase.getUsername()); + properties.put("database.password", inventoryDatabase.getPassword()); + properties.put("database.serverTimezone", ZoneId.of("UTC").toString()); + io.debezium.config.Configuration configuration = + io.debezium.config.Configuration.from(properties); + return MySqlConnectionUtils.createMySqlConnection(configuration); + } + + private void makeBinlogEvents(JdbcConnection connection, String tableId) throws SQLException { + try { + connection.setAutoCommit(false); + + // make binlog events + connection.execute( + "UPDATE " + tableId + " SET name = 'cart' where id = 103", + "INSERT INTO " + + tableId + + " VALUES(110,'spare tire','28 inch spare tire','26.2')", + "DELETE FROM " + tableId + " where id = 110", + "UPDATE " + tableId + " SET name = '12-pack drill bits' where id = 103"); + connection.commit(); + } finally { + connection.close(); + } + } + + public static void assertEqualsInAnyOrder(List expected, List actual) { + assertTrue(expected != null && actual != null); + assertEqualsInOrder( + expected.stream().sorted().collect(Collectors.toList()), + actual.stream().sorted().collect(Collectors.toList())); + } + + public static void assertEqualsInOrder(List expected, List actual) { + assertTrue(expected != null && actual != null); + assertEquals(expected.size(), actual.size()); + assertArrayEquals(expected.toArray(new String[0]), actual.toArray(new String[0])); + } + private static MySqlContainer createMySqlContainer(MySqlVersion version) { return (MySqlContainer) new MySqlContainer(version) diff --git a/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/MySqlDialect.java b/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/MySqlDialect.java index a2d323896c..b8a3123ee1 100644 --- a/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/MySqlDialect.java +++ b/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/MySqlDialect.java @@ -136,12 +136,14 @@ public TableChange queryTableSchema(JdbcConnection jdbc, TableId tableId) { } @Override - public MySqlSourceFetchTaskContext createFetchTaskContext(SourceSplitBase sourceSplitBase) { + public MySqlSourceFetchTaskContext createFetchTaskContext( + SourceSplitBase sourceSplitBase, JdbcSourceConfig taskSourceConfig) { final MySqlConnection jdbcConnection = - createMySqlConnection(sourceConfig.getDbzConfiguration()); + createMySqlConnection(taskSourceConfig.getDbzConfiguration()); final BinaryLogClient binaryLogClient = - createBinaryClient(sourceConfig.getDbzConfiguration()); - return new MySqlSourceFetchTaskContext(sourceConfig, this, jdbcConnection, binaryLogClient); + createBinaryClient(taskSourceConfig.getDbzConfiguration()); + return new MySqlSourceFetchTaskContext( + taskSourceConfig, this, jdbcConnection, binaryLogClient); } @Override