From cb1b232794734404fa0755cf5a55e7bb02241d88 Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Wed, 28 Aug 2024 21:45:48 +0800 Subject: [PATCH] [minor][tests] Fix test testDanglingDroppingTableDuringBinlogMode due to imprecise timestamp startup This closes #3580 --- .../mysql/source/MySqlPipelineITCase.java | 74 +++++++++++++++++++ .../cdc/pipeline/tests/MysqlE2eITCase.java | 46 +++++++++--- 2 files changed, 108 insertions(+), 12 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java index 82f7b5e7b8..f2b4ccb82f 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java @@ -38,6 +38,7 @@ import org.apache.flink.cdc.common.types.DataType; import org.apache.flink.cdc.common.types.DataTypes; import org.apache.flink.cdc.common.types.RowType; +import org.apache.flink.cdc.common.utils.Preconditions; import org.apache.flink.cdc.connectors.mysql.factory.MySqlDataSourceFactory; import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory; import org.apache.flink.cdc.connectors.mysql.table.StartupOptions; @@ -57,6 +58,7 @@ import org.testcontainers.lifecycle.Startables; import java.sql.Connection; +import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; @@ -689,6 +691,78 @@ public void testSchemaChangeEvents() throws Exception { actual.stream().map(Object::toString).collect(Collectors.toList())); } + @Test + public void testDanglingDropTableEventInBinlog() throws Exception { + env.setParallelism(1); + inventoryDatabase.createAndInitialize(); + + // Create a new table for later deletion + try (Connection connection = inventoryDatabase.getJdbcConnection(); + Statement statement = connection.createStatement()) { + statement.execute("CREATE TABLE live_fast(ID INT PRIMARY KEY);"); + } + + String logFileName = null; + Long logPosition = null; + + try (Connection connection = inventoryDatabase.getJdbcConnection(); + Statement statement = connection.createStatement()) { + ResultSet rs = statement.executeQuery("SHOW BINARY LOGS;"); + while (rs.next()) { + logFileName = rs.getString("Log_name"); + logPosition = rs.getLong("File_size"); + } + } + + // We start reading binlog from the tail of current position and file to avoid reading + // previous events. The next DDL event (DROP TABLE) will push binlog position forward. + Preconditions.checkNotNull(logFileName, "Log file name must not be null"); + Preconditions.checkNotNull(logPosition, "Log position name must not be null"); + LOG.info("Trying to restore from {} @ {}...", logFileName, logPosition); + + try (Connection connection = inventoryDatabase.getJdbcConnection(); + Statement statement = connection.createStatement()) { + statement.execute("DROP TABLE live_fast;"); + } + + MySqlSourceConfigFactory configFactory = + new MySqlSourceConfigFactory() + .hostname(MYSQL8_CONTAINER.getHost()) + .port(MYSQL8_CONTAINER.getDatabasePort()) + .username(TEST_USER) + .password(TEST_PASSWORD) + .databaseList(inventoryDatabase.getDatabaseName()) + .tableList(inventoryDatabase.getDatabaseName() + ".*") + .startupOptions(StartupOptions.specificOffset(logFileName, logPosition)) + .serverId(getServerId(env.getParallelism())) + .serverTimeZone("UTC") + .includeSchemaChanges(SCHEMA_CHANGE_ENABLED.defaultValue()); + + FlinkSourceProvider sourceProvider = + (FlinkSourceProvider) new MySqlDataSource(configFactory).getEventSourceProvider(); + CloseableIterator events = + env.fromSource( + sourceProvider.getSource(), + WatermarkStrategy.noWatermarks(), + MySqlDataSourceFactory.IDENTIFIER, + new EventTypeInfo()) + .executeAndCollect(); + Thread.sleep(5_000); + + List expectedEvents = + new ArrayList<>( + getInventoryCreateAllTableEvents(inventoryDatabase.getDatabaseName())); + + expectedEvents.add( + new DropTableEvent( + TableId.tableId(inventoryDatabase.getDatabaseName(), "live_fast"))); + + List actual = fetchResults(events, expectedEvents.size()); + assertEqualsInAnyOrder( + expectedEvents.stream().map(Object::toString).collect(Collectors.toList()), + actual.stream().map(Object::toString).collect(Collectors.toList())); + } + private CreateTableEvent getProductsCreateTableEvent(TableId tableId) { return new CreateTableEvent( tableId, diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java index ccdbbbad72..a8c7a8c5f6 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java @@ -18,6 +18,7 @@ package org.apache.flink.cdc.pipeline.tests; import org.apache.flink.cdc.common.test.utils.TestUtils; +import org.apache.flink.cdc.common.utils.Preconditions; import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer; import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion; import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase; @@ -36,6 +37,7 @@ import java.nio.file.Path; import java.sql.Connection; import java.sql.DriverManager; +import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.time.Duration; @@ -333,16 +335,34 @@ public void testSchemaChangeEvents() throws Exception { } @Test - public void testDroppingTable() throws Exception { - Thread.sleep(5000); - LOG.info("Sleep 5 seconds to distinguish initial DDL events with dropping table events..."); - long ddlTimestamp = System.currentTimeMillis(); - Thread.sleep(5000); - LOG.info("Going to drop tables after timestamp {}", ddlTimestamp); + public void testDanglingDropTableEventInBinlog() throws Exception { + // Create a new table for later deletion + try (Connection connection = mysqlInventoryDatabase.getJdbcConnection(); + Statement statement = connection.createStatement()) { + statement.execute("CREATE TABLE live_fast(ID INT PRIMARY KEY);"); + } + + String logFileName = null; + Long logPosition = null; + + try (Connection connection = mysqlInventoryDatabase.getJdbcConnection(); + Statement statement = connection.createStatement()) { + ResultSet rs = statement.executeQuery("SHOW BINARY LOGS;"); + while (rs.next()) { + logFileName = rs.getString("Log_name"); + logPosition = rs.getLong("File_size"); + } + } + + // We start reading binlog from the tail of current position and file to avoid reading + // previous events. The next DDL event (DROP TABLE) will push binlog position forward. + Preconditions.checkNotNull(logFileName, "Log file name must not be null"); + Preconditions.checkNotNull(logPosition, "Log position name must not be null"); + LOG.info("Trying to restore from {} @ {}...", logFileName, logPosition); try (Connection connection = mysqlInventoryDatabase.getJdbcConnection(); Statement statement = connection.createStatement()) { - statement.execute("DROP TABLE products;"); + statement.execute("DROP TABLE live_fast;"); } String pipelineJob = @@ -356,8 +376,9 @@ public void testDroppingTable() throws Exception { + " tables: %s.\\.*\n" + " server-id: 5400-5404\n" + " server-time-zone: UTC\n" - + " scan.startup.mode: timestamp\n" - + " scan.startup.timestamp-millis: %d\n" + + " scan.startup.mode: specific-offset\n" + + " scan.startup.specific-offset.file: %s\n" + + " scan.startup.specific-offset.pos: %d\n" + " scan.binlog.newly-added-table.enabled: true\n" + "\n" + "sink:\n" @@ -370,7 +391,8 @@ public void testDroppingTable() throws Exception { MYSQL_TEST_USER, MYSQL_TEST_PASSWORD, mysqlInventoryDatabase.getDatabaseName(), - ddlTimestamp, + logFileName, + logPosition, parallelism); Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar"); Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar"); @@ -380,13 +402,13 @@ public void testDroppingTable() throws Exception { LOG.info("Pipeline job is running"); waitUntilSpecificEvent( String.format( - "Table %s.products received SchemaChangeEvent DropTableEvent{tableId=%s.products} and start to be blocked.", + "Table %s.live_fast received SchemaChangeEvent DropTableEvent{tableId=%s.live_fast} and start to be blocked.", mysqlInventoryDatabase.getDatabaseName(), mysqlInventoryDatabase.getDatabaseName())); waitUntilSpecificEvent( String.format( - "Schema change event DropTableEvent{tableId=%s.products} has been handled in another subTask already.", + "Schema change event DropTableEvent{tableId=%s.live_fast} has been handled in another subTask already.", mysqlInventoryDatabase.getDatabaseName())); }