Skip to content

Commit

Permalink
[minor][tests] Fix test testDanglingDroppingTableDuringBinlogMode due…
Browse files Browse the repository at this point in the history
… to imprecise timestamp startup

This closes  #3580
  • Loading branch information
yuxiqian authored Aug 28, 2024
1 parent 0e9a176 commit cb1b232
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.types.RowType;
import org.apache.flink.cdc.common.utils.Preconditions;
import org.apache.flink.cdc.connectors.mysql.factory.MySqlDataSourceFactory;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
Expand All @@ -57,6 +58,7 @@
import org.testcontainers.lifecycle.Startables;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
Expand Down Expand Up @@ -689,6 +691,78 @@ public void testSchemaChangeEvents() throws Exception {
actual.stream().map(Object::toString).collect(Collectors.toList()));
}

@Test
public void testDanglingDropTableEventInBinlog() throws Exception {
env.setParallelism(1);
inventoryDatabase.createAndInitialize();

// Create a new table for later deletion
try (Connection connection = inventoryDatabase.getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute("CREATE TABLE live_fast(ID INT PRIMARY KEY);");
}

String logFileName = null;
Long logPosition = null;

try (Connection connection = inventoryDatabase.getJdbcConnection();
Statement statement = connection.createStatement()) {
ResultSet rs = statement.executeQuery("SHOW BINARY LOGS;");
while (rs.next()) {
logFileName = rs.getString("Log_name");
logPosition = rs.getLong("File_size");
}
}

// We start reading binlog from the tail of current position and file to avoid reading
// previous events. The next DDL event (DROP TABLE) will push binlog position forward.
Preconditions.checkNotNull(logFileName, "Log file name must not be null");
Preconditions.checkNotNull(logPosition, "Log position name must not be null");
LOG.info("Trying to restore from {} @ {}...", logFileName, logPosition);

try (Connection connection = inventoryDatabase.getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute("DROP TABLE live_fast;");
}

MySqlSourceConfigFactory configFactory =
new MySqlSourceConfigFactory()
.hostname(MYSQL8_CONTAINER.getHost())
.port(MYSQL8_CONTAINER.getDatabasePort())
.username(TEST_USER)
.password(TEST_PASSWORD)
.databaseList(inventoryDatabase.getDatabaseName())
.tableList(inventoryDatabase.getDatabaseName() + ".*")
.startupOptions(StartupOptions.specificOffset(logFileName, logPosition))
.serverId(getServerId(env.getParallelism()))
.serverTimeZone("UTC")
.includeSchemaChanges(SCHEMA_CHANGE_ENABLED.defaultValue());

FlinkSourceProvider sourceProvider =
(FlinkSourceProvider) new MySqlDataSource(configFactory).getEventSourceProvider();
CloseableIterator<Event> events =
env.fromSource(
sourceProvider.getSource(),
WatermarkStrategy.noWatermarks(),
MySqlDataSourceFactory.IDENTIFIER,
new EventTypeInfo())
.executeAndCollect();
Thread.sleep(5_000);

List<Event> expectedEvents =
new ArrayList<>(
getInventoryCreateAllTableEvents(inventoryDatabase.getDatabaseName()));

expectedEvents.add(
new DropTableEvent(
TableId.tableId(inventoryDatabase.getDatabaseName(), "live_fast")));

List<Event> actual = fetchResults(events, expectedEvents.size());
assertEqualsInAnyOrder(
expectedEvents.stream().map(Object::toString).collect(Collectors.toList()),
actual.stream().map(Object::toString).collect(Collectors.toList()));
}

private CreateTableEvent getProductsCreateTableEvent(TableId tableId) {
return new CreateTableEvent(
tableId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.cdc.pipeline.tests;

import org.apache.flink.cdc.common.test.utils.TestUtils;
import org.apache.flink.cdc.common.utils.Preconditions;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
Expand All @@ -36,6 +37,7 @@
import java.nio.file.Path;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
Expand Down Expand Up @@ -333,16 +335,34 @@ public void testSchemaChangeEvents() throws Exception {
}

@Test
public void testDroppingTable() throws Exception {
Thread.sleep(5000);
LOG.info("Sleep 5 seconds to distinguish initial DDL events with dropping table events...");
long ddlTimestamp = System.currentTimeMillis();
Thread.sleep(5000);
LOG.info("Going to drop tables after timestamp {}", ddlTimestamp);
public void testDanglingDropTableEventInBinlog() throws Exception {
// Create a new table for later deletion
try (Connection connection = mysqlInventoryDatabase.getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute("CREATE TABLE live_fast(ID INT PRIMARY KEY);");
}

String logFileName = null;
Long logPosition = null;

try (Connection connection = mysqlInventoryDatabase.getJdbcConnection();
Statement statement = connection.createStatement()) {
ResultSet rs = statement.executeQuery("SHOW BINARY LOGS;");
while (rs.next()) {
logFileName = rs.getString("Log_name");
logPosition = rs.getLong("File_size");
}
}

// We start reading binlog from the tail of current position and file to avoid reading
// previous events. The next DDL event (DROP TABLE) will push binlog position forward.
Preconditions.checkNotNull(logFileName, "Log file name must not be null");
Preconditions.checkNotNull(logPosition, "Log position name must not be null");
LOG.info("Trying to restore from {} @ {}...", logFileName, logPosition);

try (Connection connection = mysqlInventoryDatabase.getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute("DROP TABLE products;");
statement.execute("DROP TABLE live_fast;");
}

String pipelineJob =
Expand All @@ -356,8 +376,9 @@ public void testDroppingTable() throws Exception {
+ " tables: %s.\\.*\n"
+ " server-id: 5400-5404\n"
+ " server-time-zone: UTC\n"
+ " scan.startup.mode: timestamp\n"
+ " scan.startup.timestamp-millis: %d\n"
+ " scan.startup.mode: specific-offset\n"
+ " scan.startup.specific-offset.file: %s\n"
+ " scan.startup.specific-offset.pos: %d\n"
+ " scan.binlog.newly-added-table.enabled: true\n"
+ "\n"
+ "sink:\n"
Expand All @@ -370,7 +391,8 @@ public void testDroppingTable() throws Exception {
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
mysqlInventoryDatabase.getDatabaseName(),
ddlTimestamp,
logFileName,
logPosition,
parallelism);
Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar");
Expand All @@ -380,13 +402,13 @@ public void testDroppingTable() throws Exception {
LOG.info("Pipeline job is running");
waitUntilSpecificEvent(
String.format(
"Table %s.products received SchemaChangeEvent DropTableEvent{tableId=%s.products} and start to be blocked.",
"Table %s.live_fast received SchemaChangeEvent DropTableEvent{tableId=%s.live_fast} and start to be blocked.",
mysqlInventoryDatabase.getDatabaseName(),
mysqlInventoryDatabase.getDatabaseName()));

waitUntilSpecificEvent(
String.format(
"Schema change event DropTableEvent{tableId=%s.products} has been handled in another subTask already.",
"Schema change event DropTableEvent{tableId=%s.live_fast} has been handled in another subTask already.",
mysqlInventoryDatabase.getDatabaseName()));
}

Expand Down

0 comments on commit cb1b232

Please sign in to comment.