Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][cdc-base] Flink CDC base registers the identical history engine on multiple tasks #1340

Merged
merged 5 commits into from
Aug 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.flink.annotation.Experimental;

import com.ververica.cdc.connectors.base.config.JdbcSourceConfig;
import com.ververica.cdc.connectors.base.config.SourceConfig;
import com.ververica.cdc.connectors.base.source.assigner.splitter.ChunkSplitter;
import com.ververica.cdc.connectors.base.source.meta.offset.Offset;
Expand Down Expand Up @@ -69,5 +70,6 @@ public interface DataSourceDialect<ID extends DataCollectionId, S, C extends Sou
FetchTask<SourceSplitBase> createFetchTask(SourceSplitBase sourceSplitBase);

/** The task context used for fetch task to fetch data from external systems. */
FetchTask.Context createFetchTaskContext(SourceSplitBase sourceSplitBase);
FetchTask.Context createFetchTaskContext(
SourceSplitBase sourceSplitBase, JdbcSourceConfig taskSourceConfig);
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,5 +75,6 @@ default JdbcConnection openJdbcConnection(JdbcSourceConfig sourceConfig) {
FetchTask<SourceSplitBase> createFetchTask(SourceSplitBase sourceSplitBase);

@Override
JdbcSourceFetchTaskContext createFetchTaskContext(SourceSplitBase sourceSplitBase);
JdbcSourceFetchTaskContext createFetchTaskContext(
SourceSplitBase sourceSplitBase, JdbcSourceConfig taskSourceConfig);
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public SourceReader createReader(SourceReaderContext readerContext) {
Supplier<JdbcSourceSplitReader> splitReaderSupplier =
() ->
new JdbcSourceSplitReader(
readerContext.getIndexOfSubtask(), dataSourceDialect);
readerContext.getIndexOfSubtask(), dataSourceDialect, sourceConfig);
return new JdbcIncrementalSourceReader<>(
elementsQueue,
splitReaderSupplier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;

import com.ververica.cdc.connectors.base.config.JdbcSourceConfig;
import com.ververica.cdc.connectors.base.dialect.JdbcDataSourceDialect;
import com.ververica.cdc.connectors.base.source.meta.split.ChangeEventRecords;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
Expand Down Expand Up @@ -51,11 +52,14 @@ public class JdbcSourceSplitReader implements SplitReader<SourceRecord, SourceSp
@Nullable private Fetcher<SourceRecord, SourceSplitBase> currentFetcher;
@Nullable private String currentSplitId;
private final JdbcDataSourceDialect dataSourceDialect;
private final JdbcSourceConfig sourceConfig;

public JdbcSourceSplitReader(int subtaskId, JdbcDataSourceDialect dataSourceDialect) {
public JdbcSourceSplitReader(
int subtaskId, JdbcDataSourceDialect dataSourceDialect, JdbcSourceConfig sourceConfig) {
this.subtaskId = subtaskId;
this.splits = new ArrayDeque<>();
this.dataSourceDialect = dataSourceDialect;
this.sourceConfig = sourceConfig;
}

@Override
Expand Down Expand Up @@ -114,7 +118,7 @@ protected void checkSplitOrStartNext() throws IOException {
if (nextSplit.isSnapshotSplit()) {
if (currentFetcher == null) {
final JdbcSourceFetchTaskContext taskContext =
dataSourceDialect.createFetchTaskContext(nextSplit);
dataSourceDialect.createFetchTaskContext(nextSplit, sourceConfig);
currentFetcher = new JdbcSourceScanFetcher(taskContext, subtaskId);
}
} else {
Expand All @@ -124,7 +128,7 @@ protected void checkSplitOrStartNext() throws IOException {
currentFetcher.close();
}
final JdbcSourceFetchTaskContext taskContext =
dataSourceDialect.createFetchTaskContext(nextSplit);
dataSourceDialect.createFetchTaskContext(nextSplit, sourceConfig);
currentFetcher = new JdbcSourceStreamFetcher(taskContext, subtaskId);
LOG.info("Stream fetcher is created.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import static com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher.SIGNAL_EVENT_VALUE_SCHEMA_NAME;
import static com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher.WATERMARK_KIND;
import static io.debezium.connector.AbstractSourceInfo.DATABASE_NAME_KEY;
import static io.debezium.connector.AbstractSourceInfo.SCHEMA_NAME_KEY;
import static io.debezium.connector.AbstractSourceInfo.TABLE_NAME_KEY;
import static org.apache.flink.util.Preconditions.checkState;

Expand Down Expand Up @@ -137,8 +138,17 @@ public static TableId getTableId(SourceRecord dataRecord) {
Struct value = (Struct) dataRecord.value();
Struct source = value.getStruct(Envelope.FieldName.SOURCE);
String dbName = source.getString(DATABASE_NAME_KEY);
// Oracle need schemaName
String schemaName = getSchemaName(source);
String tableName = source.getString(TABLE_NAME_KEY);
return new TableId(dbName, null, tableName);
return new TableId(dbName, schemaName, tableName);
}

public static String getSchemaName(Struct source) {
if (source.schema().fields().stream().anyMatch(r -> SCHEMA_NAME_KEY.equals(r.name()))) {
return source.getString(SCHEMA_NAME_KEY);
}
return null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is better to add a test for this method

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also think we only need changes in this class and add some tests for it. Other changes need to be removed from this PR.

}

public static Object[] getSplitKey(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,27 @@
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.conversion.RowRowConverter;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.CloseableIterator;

import com.ververica.cdc.connectors.base.experimental.MySqlSourceBuilder;
import com.ververica.cdc.connectors.base.experimental.utils.MySqlConnectionUtils;
import com.ververica.cdc.connectors.base.source.JdbcIncrementalSource;
import com.ververica.cdc.connectors.base.testutils.MySqlContainer;
import com.ververica.cdc.connectors.base.testutils.MySqlVersion;
import com.ververica.cdc.connectors.base.testutils.UniqueDatabase;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema;
import io.debezium.connector.mysql.MySqlConnection;
import io.debezium.jdbc.JdbcConnection;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Rule;
Expand All @@ -37,8 +50,20 @@
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;

import java.sql.SQLException;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

/** Example Tests for {@link JdbcIncrementalSource}. */
public class MySqlChangeEventSourceExampleTest {

Expand Down Expand Up @@ -70,7 +95,7 @@ public static void startContainers() {

@Test
@Ignore("Test ignored because it won't stop and is used for manual test")
public void testConsumingAllEvents() throws Exception {
public void testConsumingScanEvents() throws Exception {
inventoryDatabase.createAndInitialize();
JdbcIncrementalSource<String> mySqlChangeEventSource =
new MySqlSourceBuilder()
Expand Down Expand Up @@ -100,6 +125,154 @@ public void testConsumingAllEvents() throws Exception {
env.execute("Print MySQL Snapshot + Binlog");
}

@Test
@Ignore("Test ignored because it won't stop and is used for manual test")
public void testConsumingAllEvents() throws Exception {
final DataType dataType =
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.BIGINT()),
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("description", DataTypes.STRING()),
DataTypes.FIELD("weight", DataTypes.FLOAT()));

inventoryDatabase.createAndInitialize();
final String tableId = inventoryDatabase.getDatabaseName() + ".products";
JdbcIncrementalSource<RowData> mySqlChangeEventSource =
new MySqlSourceBuilder()
.hostname(MYSQL_CONTAINER.getHost())
.port(MYSQL_CONTAINER.getDatabasePort())
.databaseList(inventoryDatabase.getDatabaseName())
.tableList(tableId)
.username(inventoryDatabase.getUsername())
.password(inventoryDatabase.getPassword())
.serverId("5401-5404")
.deserializer(buildRowDataDebeziumDeserializeSchema(dataType))
.includeSchemaChanges(true) // output the schema changes as well
.splitSize(2)
.build();

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// enable checkpoint
env.enableCheckpointing(3000);
// set the source parallelism to 4
CloseableIterator<RowData> iterator =
env.fromSource(
mySqlChangeEventSource,
WatermarkStrategy.noWatermarks(),
"MySqlParallelSource")
.setParallelism(4)
.executeAndCollect(); // collect record

String[] snapshotExpectedRecords =
new String[] {
"+I[101, scooter, Small 2-wheel scooter, 3.14]",
"+I[102, car battery, 12V car battery, 8.1]",
"+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8]",
"+I[104, hammer, 12oz carpenter's hammer, 0.75]",
"+I[105, hammer, 14oz carpenter's hammer, 0.875]",
"+I[106, hammer, 16oz carpenter's hammer, 1.0]",
"+I[107, rocks, box of assorted rocks, 5.3]",
"+I[108, jacket, water resistent black wind breaker, 0.1]",
"+I[109, spare tire, 24 inch spare tire, 22.2]"
};

// step-1: consume snapshot data
List<RowData> snapshotRowDataList = new ArrayList();
for (int i = 0; i < snapshotExpectedRecords.length && iterator.hasNext(); i++) {
snapshotRowDataList.add(iterator.next());
}

List<String> snapshotActualRecords = formatResult(snapshotRowDataList, dataType);
assertEqualsInAnyOrder(Arrays.asList(snapshotExpectedRecords), snapshotActualRecords);

// step-2: make 6 change events in one MySQL transaction
makeBinlogEvents(getConnection(), tableId);

String[] binlogExpectedRecords =
new String[] {
"-U[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8]",
"+U[103, cart, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8]",
"+I[110, spare tire, 28 inch spare tire, 26.2]",
"-D[110, spare tire, 28 inch spare tire, 26.2]",
"-U[103, cart, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8]",
"+U[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8]"
};

// step-3: consume binlog change events
List<RowData> binlogRowDataList = new ArrayList();
for (int i = 0; i < binlogExpectedRecords.length && iterator.hasNext(); i++) {
binlogRowDataList.add(iterator.next());
}
List<String> binlogActualRecords = formatResult(binlogRowDataList, dataType);
assertEqualsInAnyOrder(Arrays.asList(binlogExpectedRecords), binlogActualRecords);

// stop the worker
iterator.close();
}

private RowDataDebeziumDeserializeSchema buildRowDataDebeziumDeserializeSchema(
DataType dataType) {
LogicalType logicalType = TypeConversions.fromDataToLogicalType(dataType);
InternalTypeInfo<RowData> typeInfo = InternalTypeInfo.of(logicalType);
return RowDataDebeziumDeserializeSchema.newBuilder()
.setPhysicalRowType((RowType) dataType.getLogicalType())
.setResultTypeInfo(typeInfo)
.build();
}

private List<String> formatResult(List<RowData> records, DataType dataType) {
RowRowConverter rowRowConverter = RowRowConverter.create(dataType);
rowRowConverter.open(Thread.currentThread().getContextClassLoader());
return records.stream()
.map(rowRowConverter::toExternal)
.map(Object::toString)
.collect(Collectors.toList());
}

private MySqlConnection getConnection() {
Map<String, String> properties = new HashMap<>();
properties.put("database.hostname", MYSQL_CONTAINER.getHost());
properties.put("database.port", String.valueOf(MYSQL_CONTAINER.getDatabasePort()));
properties.put("database.user", inventoryDatabase.getUsername());
properties.put("database.password", inventoryDatabase.getPassword());
properties.put("database.serverTimezone", ZoneId.of("UTC").toString());
io.debezium.config.Configuration configuration =
io.debezium.config.Configuration.from(properties);
return MySqlConnectionUtils.createMySqlConnection(configuration);
}

private void makeBinlogEvents(JdbcConnection connection, String tableId) throws SQLException {
try {
connection.setAutoCommit(false);

// make binlog events
connection.execute(
"UPDATE " + tableId + " SET name = 'cart' where id = 103",
"INSERT INTO "
+ tableId
+ " VALUES(110,'spare tire','28 inch spare tire','26.2')",
"DELETE FROM " + tableId + " where id = 110",
"UPDATE " + tableId + " SET name = '12-pack drill bits' where id = 103");
connection.commit();
} finally {
connection.close();
}
}

public static void assertEqualsInAnyOrder(List<String> expected, List<String> actual) {
assertTrue(expected != null && actual != null);
assertEqualsInOrder(
expected.stream().sorted().collect(Collectors.toList()),
actual.stream().sorted().collect(Collectors.toList()));
}

public static void assertEqualsInOrder(List<String> expected, List<String> actual) {
assertTrue(expected != null && actual != null);
assertEquals(expected.size(), actual.size());
assertArrayEquals(expected.toArray(new String[0]), actual.toArray(new String[0]));
}

private static MySqlContainer createMySqlContainer(MySqlVersion version) {
return (MySqlContainer)
new MySqlContainer(version)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,14 @@ public TableChange queryTableSchema(JdbcConnection jdbc, TableId tableId) {
}

@Override
public MySqlSourceFetchTaskContext createFetchTaskContext(SourceSplitBase sourceSplitBase) {
public MySqlSourceFetchTaskContext createFetchTaskContext(
SourceSplitBase sourceSplitBase, JdbcSourceConfig taskSourceConfig) {
final MySqlConnection jdbcConnection =
createMySqlConnection(sourceConfig.getDbzConfiguration());
createMySqlConnection(taskSourceConfig.getDbzConfiguration());
final BinaryLogClient binaryLogClient =
createBinaryClient(sourceConfig.getDbzConfiguration());
return new MySqlSourceFetchTaskContext(sourceConfig, this, jdbcConnection, binaryLogClient);
createBinaryClient(taskSourceConfig.getDbzConfiguration());
return new MySqlSourceFetchTaskContext(
taskSourceConfig, this, jdbcConnection, binaryLogClient);
}

@Override
Expand Down