From 92ef28109c6329393bf2c1263386b5a731e9ff5b Mon Sep 17 00:00:00 2001 From: Akash Kulkarni Date: Thu, 15 Feb 2024 14:42:02 -0800 Subject: [PATCH 1/7] [Source-mysql] : Add config to throw an error on invalid CDC position --- .../connectors/source-mysql/metadata.yaml | 2 +- .../integrations/source/mysql/MySqlSpecConstants.java | 8 ++++++++ .../source/mysql/initialsync/MySqlInitialReadUtil.java | 7 +++++++ 3 files changed, 16 insertions(+), 1 deletion(-) create mode 100644 airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSpecConstants.java diff --git a/airbyte-integrations/connectors/source-mysql/metadata.yaml b/airbyte-integrations/connectors/source-mysql/metadata.yaml index 68a3b86807fe..f63c1375e894 100644 --- a/airbyte-integrations/connectors/source-mysql/metadata.yaml +++ b/airbyte-integrations/connectors/source-mysql/metadata.yaml @@ -9,7 +9,7 @@ data: connectorSubtype: database connectorType: source definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad - dockerImageTag: 3.3.7 + dockerImageTag: 3.3.8 dockerRepository: airbyte/source-mysql documentationUrl: https://docs.airbyte.com/integrations/sources/mysql githubIssueLabel: source-mysql diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSpecConstants.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSpecConstants.java new file mode 100644 index 000000000000..8fecfd57ba89 --- /dev/null +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSpecConstants.java @@ -0,0 +1,8 @@ +package io.airbyte.integrations.source.mysql; + +// Constants defined in airbyte-integrations/connectors/source-postgres/src/main/resources/spec.json. +public class MySqlSpecConstants { + public static final String INVALID_CDC_CURSOR_POSITION_PROPERTY = "invalid_cdc_cursor_position_behavior"; + public static final String FAIL_SYNC_OPTION = "Fail sync"; + public static final String RESYNC_DATA_OPTION = "Re-sync data"; +} diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialReadUtil.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialReadUtil.java index ff0b7a477e5c..8c7edd97720b 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialReadUtil.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialReadUtil.java @@ -7,6 +7,8 @@ import static io.airbyte.cdk.db.DbAnalyticsUtils.cdcCursorInvalidMessage; import static io.airbyte.integrations.source.mysql.MySqlQueryUtils.getTableSizeInfoForStreams; import static io.airbyte.integrations.source.mysql.MySqlQueryUtils.prettyPrintConfiguredAirbyteStreamList; +import static io.airbyte.integrations.source.mysql.MySqlSpecConstants.FAIL_SYNC_OPTION; +import static io.airbyte.integrations.source.mysql.MySqlSpecConstants.INVALID_CDC_CURSOR_POSITION_PROPERTY; import static io.airbyte.integrations.source.mysql.cdc.MysqlCdcStateConstants.MYSQL_CDC_OFFSET; import static io.airbyte.integrations.source.mysql.initialsync.MySqlInitialLoadGlobalStateManager.STATE_TYPE_KEY; import static io.airbyte.integrations.source.mysql.initialsync.MySqlInitialLoadStateManager.PRIMARY_KEY_STATE_TYPE; @@ -25,6 +27,7 @@ import io.airbyte.cdk.integrations.source.relationaldb.TableInfo; import io.airbyte.cdk.integrations.source.relationaldb.models.CdcState; import io.airbyte.cdk.integrations.source.relationaldb.state.StateManager; +import io.airbyte.commons.exceptions.ConfigErrorException; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.util.AutoCloseableIterator; import io.airbyte.commons.util.AutoCloseableIterators; @@ -111,6 +114,10 @@ public static List> getCdcReadIterators(fi if (!savedOffsetStillPresentOnServer) { AirbyteTraceMessageUtility.emitAnalyticsTrace(cdcCursorInvalidMessage()); + if (!sourceConfig.get("replication_method").has(INVALID_CDC_CURSOR_POSITION_PROPERTY) || sourceConfig.get("replication_method").get( + INVALID_CDC_CURSOR_POSITION_PROPERTY).asText().equals(FAIL_SYNC_OPTION)) { + throw new ConfigErrorException("Saved offset no longer present on the server. Please increase binlog retention or reduce sync frequency. See for more details."); + } LOGGER.warn("Saved offset no longer present on the server, Airbyte is going to trigger a sync from scratch"); } From 866c7225dcf6398d78d3067aec4a653160c2aff7 Mon Sep 17 00:00:00 2001 From: Akash Kulkarni Date: Thu, 15 Feb 2024 15:52:09 -0800 Subject: [PATCH 2/7] More docs --- .../mysql/initialsync/MySqlInitialReadUtil.java | 2 +- docs/integrations/sources/mysql.md | 1 + .../sources/mysql/mysql-troubleshooting.md | 12 ++++++++++++ 3 files changed, 14 insertions(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialReadUtil.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialReadUtil.java index 8c7edd97720b..d387a1b51c3a 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialReadUtil.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialReadUtil.java @@ -116,7 +116,7 @@ public static List> getCdcReadIterators(fi AirbyteTraceMessageUtility.emitAnalyticsTrace(cdcCursorInvalidMessage()); if (!sourceConfig.get("replication_method").has(INVALID_CDC_CURSOR_POSITION_PROPERTY) || sourceConfig.get("replication_method").get( INVALID_CDC_CURSOR_POSITION_PROPERTY).asText().equals(FAIL_SYNC_OPTION)) { - throw new ConfigErrorException("Saved offset no longer present on the server. Please increase binlog retention or reduce sync frequency. See for more details."); + throw new ConfigErrorException("Saved offset no longer present on the server. Please increase binlog retention or reduce sync frequency. See https://docs.airbyte.com/integrations/sources/mysql/mysql-troubleshooting#under-cdc-incremental-mode-there-are-still-full-refresh-syncs for more details."); } LOGGER.warn("Saved offset no longer present on the server, Airbyte is going to trigger a sync from scratch"); } diff --git a/docs/integrations/sources/mysql.md b/docs/integrations/sources/mysql.md index 09117fd7e280..18e59658342d 100644 --- a/docs/integrations/sources/mysql.md +++ b/docs/integrations/sources/mysql.md @@ -223,6 +223,7 @@ Any database or table encoding combination of charset and collation is supported | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------| +| 3.3.8 | 2024-02-20 | [35338](https://github.com/airbytehq/airbyte/pull/35338) | Add config to throw an error on invalid CDC position. | | 3.3.7 | 2024-02-13 | [35036](https://github.com/airbytehq/airbyte/pull/34751) | Emit analytics message for invalid CDC cursor. | | 3.3.6 | 2024-02-13 | [34869](https://github.com/airbytehq/airbyte/pull/34573) | Don't emit state in SourceStateIterator when there is an underlying stream failure. | | 3.3.5 | 2024-02-12 | [34580](https://github.com/airbytehq/airbyte/pull/34580) | Support special chars in db name | diff --git a/docs/integrations/sources/mysql/mysql-troubleshooting.md b/docs/integrations/sources/mysql/mysql-troubleshooting.md index aee512157839..7e6265d0b867 100644 --- a/docs/integrations/sources/mysql/mysql-troubleshooting.md +++ b/docs/integrations/sources/mysql/mysql-troubleshooting.md @@ -18,6 +18,18 @@ * Amazon RDS MySQL or MariaDB connection issues: If you see the following `Cannot create a PoolableConnectionFactory` error, please add `enabledTLSProtocols=TLSv1.2` in the JDBC parameters. * Amazon RDS MySQL connection issues: If you see `Error: HikariPool-1 - Connection is not available, request timed out after 30001ms.`, many times this due to your VPC not allowing public traffic. We recommend going through [this AWS troubleshooting checklist](https://aws.amazon.com/premiumsupport/knowledge-center/rds-cannot-connect/) to ensure the correct permissions/settings have been granted to allow Airbyte to connect to your database. +### Under CDC incremental mode, there are still full refresh syncs + +Normally under the CDC mode, the MySQL source will first run a full refresh sync to read the snapshot of all the existing data, and all subsequent runs will only be incremental syncs reading from the binlogs. However, occasionally, you may see full refresh syncs after the initial run. When this happens, you will see the following log: + +> Saved offset no longer present on the server, Airbyte is going to trigger a sync from scratch + +The root causes is that the binglogs needed for the incremental sync have been removed by MySQL. This can occur under the following scenarios: + +- When there are lots of database updates resulting in more WAL files than allowed in the `pg_wal` directory, Postgres will purge or archive the WAL files. This scenario is preventable. Possible solutions include: + - Sync the data source more frequently. + - Set a higher `binlog_expire_logs_seconds`. It's recommended to set this value to a time period of 7 days. See detailed documentation [here](https://dev.mysql.com/doc/refman/8.0/en/replication-options-binary-log.html#sysvar_binlog_expire_logs_seconds). The downside of this approach is that more disk space will be needed. + ### EventDataDeserializationException errors during initial snapshot When a sync runs for the first time using CDC, Airbyte performs an initial consistent snapshot of your database. Airbyte doesn't acquire any table locks \(for tables defined with MyISAM engine, the tables would still be locked\) while creating the snapshot to allow writes by other database clients. But in order for the sync to work without any error/unexpected behaviour, it is assumed that no schema changes are happening while the snapshot is running. From 08820c65a51c0792817bf23e9da8b681c907ae8f Mon Sep 17 00:00:00 2001 From: Akash Kulkarni Date: Fri, 16 Feb 2024 12:45:38 -0800 Subject: [PATCH 3/7] test --- .../source/mysql/MySqlSpecConstants.java | 9 +++- .../initialsync/MySqlInitialReadUtil.java | 3 +- .../source-mysql/src/main/resources/spec.json | 8 +++ .../resources/expected_cloud_spec.json | 8 +++ .../resources/expected_oss_spec.json | 8 +++ .../source/mysql/CdcMysqlSourceTest.java | 53 +++++++++++++++++++ .../test/resources/expected_cloud_spec.json | 8 +++ .../src/test/resources/expected_oss_spec.json | 8 +++ .../source/mysql/MySQLTestDatabase.java | 8 +++ 9 files changed, 111 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSpecConstants.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSpecConstants.java index 8fecfd57ba89..7735470482da 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSpecConstants.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSpecConstants.java @@ -1,8 +1,15 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + package io.airbyte.integrations.source.mysql; -// Constants defined in airbyte-integrations/connectors/source-postgres/src/main/resources/spec.json. +// Constants defined in +// airbyte-integrations/connectors/source-postgres/src/main/resources/spec.json. public class MySqlSpecConstants { + public static final String INVALID_CDC_CURSOR_POSITION_PROPERTY = "invalid_cdc_cursor_position_behavior"; public static final String FAIL_SYNC_OPTION = "Fail sync"; public static final String RESYNC_DATA_OPTION = "Re-sync data"; + } diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialReadUtil.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialReadUtil.java index d387a1b51c3a..0eac8caaaefb 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialReadUtil.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialReadUtil.java @@ -116,7 +116,8 @@ public static List> getCdcReadIterators(fi AirbyteTraceMessageUtility.emitAnalyticsTrace(cdcCursorInvalidMessage()); if (!sourceConfig.get("replication_method").has(INVALID_CDC_CURSOR_POSITION_PROPERTY) || sourceConfig.get("replication_method").get( INVALID_CDC_CURSOR_POSITION_PROPERTY).asText().equals(FAIL_SYNC_OPTION)) { - throw new ConfigErrorException("Saved offset no longer present on the server. Please increase binlog retention or reduce sync frequency. See https://docs.airbyte.com/integrations/sources/mysql/mysql-troubleshooting#under-cdc-incremental-mode-there-are-still-full-refresh-syncs for more details."); + throw new ConfigErrorException( + "Saved offset no longer present on the server. Please increase binlog retention or reduce sync frequency. See https://docs.airbyte.com/integrations/sources/mysql/mysql-troubleshooting#under-cdc-incremental-mode-there-are-still-full-refresh-syncs for more details."); } LOGGER.warn("Saved offset no longer present on the server, Airbyte is going to trigger a sync from scratch"); } diff --git a/airbyte-integrations/connectors/source-mysql/src/main/resources/spec.json b/airbyte-integrations/connectors/source-mysql/src/main/resources/spec.json index 841fa1f3bdba..decdcec29476 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/source-mysql/src/main/resources/spec.json @@ -211,6 +211,14 @@ "description": "Enter the configured MySQL server timezone. This should only be done if the configured timezone in your MySQL instance does not conform to IANNA standard.", "order": 2, "always_show": true + }, + "invalid_cdc_cursor_position_behavior": { + "type": "string", + "title": "Invalid CDC position behavior (Advanced)", + "description": "Determines whether Airbyte should fail or re-sync data in case of an stale/invalid cursor value into the WAL. If 'Fail sync' is chosen, a user will have to manually reset the connection before being able to continue syncing data. If 'Re-sync data' is chosen, Airbyte will automatically trigger a refresh but could lead to higher cloud costs and data loss.", + "enum": ["Fail sync", "Re-sync data"], + "default": "Fail sync", + "order": 3 } } }, diff --git a/airbyte-integrations/connectors/source-mysql/src/test-integration/resources/expected_cloud_spec.json b/airbyte-integrations/connectors/source-mysql/src/test-integration/resources/expected_cloud_spec.json index 50d717a95886..73a9c1485b81 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test-integration/resources/expected_cloud_spec.json +++ b/airbyte-integrations/connectors/source-mysql/src/test-integration/resources/expected_cloud_spec.json @@ -189,6 +189,14 @@ "description": "Enter the configured MySQL server timezone. This should only be done if the configured timezone in your MySQL instance does not conform to IANNA standard.", "order": 2, "always_show": true + }, + "invalid_cdc_cursor_position_behavior": { + "type": "string", + "title": "Invalid CDC position behavior (Advanced)", + "description": "Determines whether Airbyte should fail or re-sync data in case of an stale/invalid cursor value into the WAL. If 'Fail sync' is chosen, a user will have to manually reset the connection before being able to continue syncing data. If 'Re-sync data' is chosen, Airbyte will automatically trigger a refresh but could lead to higher cloud costs and data loss.", + "enum": ["Fail sync", "Re-sync data"], + "default": "Fail sync", + "order": 3 } } }, diff --git a/airbyte-integrations/connectors/source-mysql/src/test-integration/resources/expected_oss_spec.json b/airbyte-integrations/connectors/source-mysql/src/test-integration/resources/expected_oss_spec.json index 1a884d8de813..e29c8621cb92 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test-integration/resources/expected_oss_spec.json +++ b/airbyte-integrations/connectors/source-mysql/src/test-integration/resources/expected_oss_spec.json @@ -211,6 +211,14 @@ "description": "Enter the configured MySQL server timezone. This should only be done if the configured timezone in your MySQL instance does not conform to IANNA standard.", "order": 2, "always_show": true + }, + "invalid_cdc_cursor_position_behavior": { + "type": "string", + "title": "Invalid CDC position behavior (Advanced)", + "description": "Determines whether Airbyte should fail or re-sync data in case of an stale/invalid cursor value into the WAL. If 'Fail sync' is chosen, a user will have to manually reset the connection before being able to continue syncing data. If 'Re-sync data' is chosen, Airbyte will automatically trigger a refresh but could lead to higher cloud costs and data loss.", + "enum": ["Fail sync", "Re-sync data"], + "default": "Fail sync", + "order": 3 } } }, diff --git a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java index 5825ae0848df..dc0541b0bb5e 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java @@ -10,6 +10,7 @@ import static io.airbyte.integrations.source.mysql.MySqlSource.CDC_DEFAULT_CURSOR; import static io.airbyte.integrations.source.mysql.MySqlSource.CDC_LOG_FILE; import static io.airbyte.integrations.source.mysql.MySqlSource.CDC_LOG_POS; +import static io.airbyte.integrations.source.mysql.MySqlSpecConstants.FAIL_SYNC_OPTION; import static io.airbyte.integrations.source.mysql.cdc.MysqlCdcStateConstants.IS_COMPRESSED; import static io.airbyte.integrations.source.mysql.cdc.MysqlCdcStateConstants.MYSQL_CDC_OFFSET; import static io.airbyte.integrations.source.mysql.cdc.MysqlCdcStateConstants.MYSQL_DB_HISTORY; @@ -20,6 +21,7 @@ import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import com.fasterxml.jackson.databind.JsonNode; @@ -33,6 +35,7 @@ import io.airbyte.cdk.db.jdbc.JdbcDatabase; import io.airbyte.cdk.integrations.debezium.CdcSourceTest; import io.airbyte.cdk.integrations.debezium.internals.AirbyteSchemaHistoryStorage; +import io.airbyte.commons.exceptions.ConfigErrorException; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.util.AutoCloseableIterator; import io.airbyte.commons.util.AutoCloseableIterators; @@ -279,6 +282,56 @@ protected void syncShouldHandlePurgedLogsGracefully() throws Exception { "Expected 46 records to be replicated in the second sync."); } + @Test + void testSyncShouldFailPurgedLogs() throws Exception { + JsonNode config = testdb.testConfigBuilder() + .withCdcReplication(FAIL_SYNC_OPTION) + .with(SYNC_CHECKPOINT_RECORDS_PROPERTY, 1) + .build(); + final int recordsToCreate = 20; + // first batch of records. 20 created here and 6 created in setup method. + for (int recordsCreated = 0; recordsCreated < recordsToCreate; recordsCreated++) { + final JsonNode record = + Jsons.jsonNode(ImmutableMap + .of(COL_ID, 100 + recordsCreated, COL_MAKE_ID, 1, COL_MODEL, + "F-" + recordsCreated)); + writeModelRecord(record); + } + + final AutoCloseableIterator firstBatchIterator = source() + .read(config, getConfiguredCatalog(), null); + final List dataFromFirstBatch = AutoCloseableIterators + .toListAndClose(firstBatchIterator); + final List stateAfterFirstBatch = extractStateMessages(dataFromFirstBatch); + assertStateForSyncShouldHandlePurgedLogsGracefully(stateAfterFirstBatch, 1); + final Set recordsFromFirstBatch = extractRecordMessages( + dataFromFirstBatch); + + final int recordsCreatedBeforeTestCount = MODEL_RECORDS.size(); + assertEquals((recordsCreatedBeforeTestCount + recordsToCreate), recordsFromFirstBatch.size()); + // sometimes there can be more than one of these at the end of the snapshot and just before the + // first incremental. + final Set recordsFromFirstBatchWithoutDuplicates = removeDuplicates( + recordsFromFirstBatch); + + assertTrue(recordsCreatedBeforeTestCount < recordsFromFirstBatchWithoutDuplicates.size(), + "Expected first sync to include records created while the test was running."); + + // second batch of records again 20 being created + for (int recordsCreated = 0; recordsCreated < recordsToCreate; recordsCreated++) { + final JsonNode record = + Jsons.jsonNode(ImmutableMap + .of(COL_ID, 200 + recordsCreated, COL_MAKE_ID, 1, COL_MODEL, + "F-" + recordsCreated)); + writeModelRecord(record); + } + + purgeAllBinaryLogs(); + + final JsonNode state = Jsons.jsonNode(Collections.singletonList(stateAfterFirstBatch.get(stateAfterFirstBatch.size() - 1))); + assertThrows(ConfigErrorException.class, () -> source().read(config, getConfiguredCatalog(), state)); + } + /** * This test verifies that multiple states are sent during the CDC process based on number of * records. We can ensure that more than one `STATE` type of message is sent, but we are not able to diff --git a/airbyte-integrations/connectors/source-mysql/src/test/resources/expected_cloud_spec.json b/airbyte-integrations/connectors/source-mysql/src/test/resources/expected_cloud_spec.json index 52441e124b17..1e60848e93d7 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/resources/expected_cloud_spec.json +++ b/airbyte-integrations/connectors/source-mysql/src/test/resources/expected_cloud_spec.json @@ -205,6 +205,14 @@ "description": "Enter the configured MySQL server timezone. This should only be done if the configured timezone in your MySQL instance does not conform to IANNA standard.", "order": 2, "always_show": true + }, + "invalid_cdc_cursor_position_behavior": { + "type": "string", + "title": "Invalid CDC position behavior (Advanced)", + "description": "Determines whether Airbyte should fail or re-sync data in case of an stale/invalid cursor value into the WAL. If 'Fail sync' is chosen, a user will have to manually reset the connection before being able to continue syncing data. If 'Re-sync data' is chosen, Airbyte will automatically trigger a refresh but could lead to higher cloud costs and data loss.", + "enum": ["Fail sync", "Re-sync data"], + "default": "Fail sync", + "order": 3 } } }, diff --git a/airbyte-integrations/connectors/source-mysql/src/test/resources/expected_oss_spec.json b/airbyte-integrations/connectors/source-mysql/src/test/resources/expected_oss_spec.json index 841fa1f3bdba..decdcec29476 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/resources/expected_oss_spec.json +++ b/airbyte-integrations/connectors/source-mysql/src/test/resources/expected_oss_spec.json @@ -211,6 +211,14 @@ "description": "Enter the configured MySQL server timezone. This should only be done if the configured timezone in your MySQL instance does not conform to IANNA standard.", "order": 2, "always_show": true + }, + "invalid_cdc_cursor_position_behavior": { + "type": "string", + "title": "Invalid CDC position behavior (Advanced)", + "description": "Determines whether Airbyte should fail or re-sync data in case of an stale/invalid cursor value into the WAL. If 'Fail sync' is chosen, a user will have to manually reset the connection before being able to continue syncing data. If 'Re-sync data' is chosen, Airbyte will automatically trigger a refresh but could lead to higher cloud costs and data loss.", + "enum": ["Fail sync", "Re-sync data"], + "default": "Fail sync", + "order": 3 } } }, diff --git a/airbyte-integrations/connectors/source-mysql/src/testFixtures/java/io/airbyte/integrations/source/mysql/MySQLTestDatabase.java b/airbyte-integrations/connectors/source-mysql/src/testFixtures/java/io/airbyte/integrations/source/mysql/MySQLTestDatabase.java index fc031ed541c7..219d5e90f479 100644 --- a/airbyte-integrations/connectors/source-mysql/src/testFixtures/java/io/airbyte/integrations/source/mysql/MySQLTestDatabase.java +++ b/airbyte-integrations/connectors/source-mysql/src/testFixtures/java/io/airbyte/integrations/source/mysql/MySQLTestDatabase.java @@ -4,6 +4,9 @@ package io.airbyte.integrations.source.mysql; +import static io.airbyte.integrations.source.mysql.MySqlSpecConstants.INVALID_CDC_CURSOR_POSITION_PROPERTY; +import static io.airbyte.integrations.source.mysql.MySqlSpecConstants.RESYNC_DATA_OPTION; + import com.google.common.collect.ImmutableMap; import io.airbyte.cdk.db.factory.DatabaseDriver; import io.airbyte.cdk.testutils.TestDatabase; @@ -128,12 +131,17 @@ public MySQLConfigBuilder withStandardReplication() { } public MySQLConfigBuilder withCdcReplication() { + return withCdcReplication(RESYNC_DATA_OPTION); + } + + public MySQLConfigBuilder withCdcReplication(String cdcCursorFailBehaviour) { return this .with("is_test", true) .with("replication_method", ImmutableMap.builder() .put("method", "CDC") .put("initial_waiting_seconds", 5) .put("server_time_zone", "America/Los_Angeles") + .put(INVALID_CDC_CURSOR_POSITION_PROPERTY, cdcCursorFailBehaviour) .build()); } From c499685cf7fce1fc3d9bed6c820e4860e9e46a7b Mon Sep 17 00:00:00 2001 From: Akash Kulkarni Date: Fri, 16 Feb 2024 14:20:17 -0800 Subject: [PATCH 4/7] Fix tests & error message --- .../source/mysql/initialsync/MySqlInitialReadUtil.java | 2 +- .../io/airbyte/integrations/source/mysql/MySQLTestDatabase.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialReadUtil.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialReadUtil.java index 0eac8caaaefb..47aa83ee09fc 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialReadUtil.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialReadUtil.java @@ -117,7 +117,7 @@ public static List> getCdcReadIterators(fi if (!sourceConfig.get("replication_method").has(INVALID_CDC_CURSOR_POSITION_PROPERTY) || sourceConfig.get("replication_method").get( INVALID_CDC_CURSOR_POSITION_PROPERTY).asText().equals(FAIL_SYNC_OPTION)) { throw new ConfigErrorException( - "Saved offset no longer present on the server. Please increase binlog retention or reduce sync frequency. See https://docs.airbyte.com/integrations/sources/mysql/mysql-troubleshooting#under-cdc-incremental-mode-there-are-still-full-refresh-syncs for more details."); + "Saved offset no longer present on the server. Please reset the connection, and then increase binlog retention or reduce sync frequency. See https://docs.airbyte.com/integrations/sources/mysql/mysql-troubleshooting#under-cdc-incremental-mode-there-are-still-full-refresh-syncs for more details."); } LOGGER.warn("Saved offset no longer present on the server, Airbyte is going to trigger a sync from scratch"); } diff --git a/airbyte-integrations/connectors/source-mysql/src/testFixtures/java/io/airbyte/integrations/source/mysql/MySQLTestDatabase.java b/airbyte-integrations/connectors/source-mysql/src/testFixtures/java/io/airbyte/integrations/source/mysql/MySQLTestDatabase.java index 219d5e90f479..e9b8b18fdf0c 100644 --- a/airbyte-integrations/connectors/source-mysql/src/testFixtures/java/io/airbyte/integrations/source/mysql/MySQLTestDatabase.java +++ b/airbyte-integrations/connectors/source-mysql/src/testFixtures/java/io/airbyte/integrations/source/mysql/MySQLTestDatabase.java @@ -53,7 +53,7 @@ public enum ContainerModifier { static public MySQLTestDatabase in(BaseImage baseImage, ContainerModifier... methods) { String[] methodNames = Stream.of(methods).map(im -> im.methodName).toList().toArray(new String[0]); - final var container = new MySQLContainerFactory().shared(baseImage.reference, methodNames); + final var container = new MySQLContainerFactory().exclusive(baseImage.reference, methodNames); return new MySQLTestDatabase(container).initialized(); } From 9d657332889b84345c3dbcc8126214a8709f848d Mon Sep 17 00:00:00 2001 From: Akash Kulkarni Date: Mon, 19 Feb 2024 16:18:44 -0800 Subject: [PATCH 5/7] Fix mysql tests --- .../source-mysql/src/main/resources/spec.json | 3 +- .../source/mysql/CdcMysqlSourceTest.java | 48 +------------------ .../test/resources/expected_cloud_spec.json | 3 +- .../src/test/resources/expected_oss_spec.json | 3 +- .../source/mysql/MySQLTestDatabase.java | 2 +- 5 files changed, 9 insertions(+), 50 deletions(-) diff --git a/airbyte-integrations/connectors/source-mysql/src/main/resources/spec.json b/airbyte-integrations/connectors/source-mysql/src/main/resources/spec.json index decdcec29476..78450b13aabd 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/source-mysql/src/main/resources/spec.json @@ -218,7 +218,8 @@ "description": "Determines whether Airbyte should fail or re-sync data in case of an stale/invalid cursor value into the WAL. If 'Fail sync' is chosen, a user will have to manually reset the connection before being able to continue syncing data. If 'Re-sync data' is chosen, Airbyte will automatically trigger a refresh but could lead to higher cloud costs and data loss.", "enum": ["Fail sync", "Re-sync data"], "default": "Fail sync", - "order": 3 + "order": 3, + "always_show": true } } }, diff --git a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java index dc0541b0bb5e..b88b5baa6420 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java @@ -280,56 +280,12 @@ protected void syncShouldHandlePurgedLogsGracefully() throws Exception { dataFromSecondBatch); assertEquals((recordsToCreate * 2) + recordsCreatedBeforeTestCount, recordsFromSecondBatch.size(), "Expected 46 records to be replicated in the second sync."); - } - @Test - void testSyncShouldFailPurgedLogs() throws Exception { - JsonNode config = testdb.testConfigBuilder() + JsonNode failSyncConfig = testdb.testConfigBuilder() .withCdcReplication(FAIL_SYNC_OPTION) .with(SYNC_CHECKPOINT_RECORDS_PROPERTY, 1) .build(); - final int recordsToCreate = 20; - // first batch of records. 20 created here and 6 created in setup method. - for (int recordsCreated = 0; recordsCreated < recordsToCreate; recordsCreated++) { - final JsonNode record = - Jsons.jsonNode(ImmutableMap - .of(COL_ID, 100 + recordsCreated, COL_MAKE_ID, 1, COL_MODEL, - "F-" + recordsCreated)); - writeModelRecord(record); - } - - final AutoCloseableIterator firstBatchIterator = source() - .read(config, getConfiguredCatalog(), null); - final List dataFromFirstBatch = AutoCloseableIterators - .toListAndClose(firstBatchIterator); - final List stateAfterFirstBatch = extractStateMessages(dataFromFirstBatch); - assertStateForSyncShouldHandlePurgedLogsGracefully(stateAfterFirstBatch, 1); - final Set recordsFromFirstBatch = extractRecordMessages( - dataFromFirstBatch); - - final int recordsCreatedBeforeTestCount = MODEL_RECORDS.size(); - assertEquals((recordsCreatedBeforeTestCount + recordsToCreate), recordsFromFirstBatch.size()); - // sometimes there can be more than one of these at the end of the snapshot and just before the - // first incremental. - final Set recordsFromFirstBatchWithoutDuplicates = removeDuplicates( - recordsFromFirstBatch); - - assertTrue(recordsCreatedBeforeTestCount < recordsFromFirstBatchWithoutDuplicates.size(), - "Expected first sync to include records created while the test was running."); - - // second batch of records again 20 being created - for (int recordsCreated = 0; recordsCreated < recordsToCreate; recordsCreated++) { - final JsonNode record = - Jsons.jsonNode(ImmutableMap - .of(COL_ID, 200 + recordsCreated, COL_MAKE_ID, 1, COL_MODEL, - "F-" + recordsCreated)); - writeModelRecord(record); - } - - purgeAllBinaryLogs(); - - final JsonNode state = Jsons.jsonNode(Collections.singletonList(stateAfterFirstBatch.get(stateAfterFirstBatch.size() - 1))); - assertThrows(ConfigErrorException.class, () -> source().read(config, getConfiguredCatalog(), state)); + assertThrows(ConfigErrorException.class, () -> source().read(failSyncConfig, getConfiguredCatalog(), state)); } /** diff --git a/airbyte-integrations/connectors/source-mysql/src/test/resources/expected_cloud_spec.json b/airbyte-integrations/connectors/source-mysql/src/test/resources/expected_cloud_spec.json index 1e60848e93d7..66f0b3bdf647 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/resources/expected_cloud_spec.json +++ b/airbyte-integrations/connectors/source-mysql/src/test/resources/expected_cloud_spec.json @@ -212,7 +212,8 @@ "description": "Determines whether Airbyte should fail or re-sync data in case of an stale/invalid cursor value into the WAL. If 'Fail sync' is chosen, a user will have to manually reset the connection before being able to continue syncing data. If 'Re-sync data' is chosen, Airbyte will automatically trigger a refresh but could lead to higher cloud costs and data loss.", "enum": ["Fail sync", "Re-sync data"], "default": "Fail sync", - "order": 3 + "order": 3, + "always_show": true } } }, diff --git a/airbyte-integrations/connectors/source-mysql/src/test/resources/expected_oss_spec.json b/airbyte-integrations/connectors/source-mysql/src/test/resources/expected_oss_spec.json index decdcec29476..78450b13aabd 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/resources/expected_oss_spec.json +++ b/airbyte-integrations/connectors/source-mysql/src/test/resources/expected_oss_spec.json @@ -218,7 +218,8 @@ "description": "Determines whether Airbyte should fail or re-sync data in case of an stale/invalid cursor value into the WAL. If 'Fail sync' is chosen, a user will have to manually reset the connection before being able to continue syncing data. If 'Re-sync data' is chosen, Airbyte will automatically trigger a refresh but could lead to higher cloud costs and data loss.", "enum": ["Fail sync", "Re-sync data"], "default": "Fail sync", - "order": 3 + "order": 3, + "always_show": true } } }, diff --git a/airbyte-integrations/connectors/source-mysql/src/testFixtures/java/io/airbyte/integrations/source/mysql/MySQLTestDatabase.java b/airbyte-integrations/connectors/source-mysql/src/testFixtures/java/io/airbyte/integrations/source/mysql/MySQLTestDatabase.java index e9b8b18fdf0c..219d5e90f479 100644 --- a/airbyte-integrations/connectors/source-mysql/src/testFixtures/java/io/airbyte/integrations/source/mysql/MySQLTestDatabase.java +++ b/airbyte-integrations/connectors/source-mysql/src/testFixtures/java/io/airbyte/integrations/source/mysql/MySQLTestDatabase.java @@ -53,7 +53,7 @@ public enum ContainerModifier { static public MySQLTestDatabase in(BaseImage baseImage, ContainerModifier... methods) { String[] methodNames = Stream.of(methods).map(im -> im.methodName).toList().toArray(new String[0]); - final var container = new MySQLContainerFactory().exclusive(baseImage.reference, methodNames); + final var container = new MySQLContainerFactory().shared(baseImage.reference, methodNames); return new MySQLTestDatabase(container).initialized(); } From 951b59bdf52815d3cecc7b9de1ff0f3416ef2ebd Mon Sep 17 00:00:00 2001 From: Akash Kulkarni Date: Mon, 19 Feb 2024 16:41:37 -0800 Subject: [PATCH 6/7] Fix spec --- .../source-mysql/src/test/resources/expected_cloud_spec.json | 2 +- .../source-mysql/src/test/resources/expected_oss_spec.json | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/source-mysql/src/test/resources/expected_cloud_spec.json b/airbyte-integrations/connectors/source-mysql/src/test/resources/expected_cloud_spec.json index 66f0b3bdf647..4b16b191b592 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/resources/expected_cloud_spec.json +++ b/airbyte-integrations/connectors/source-mysql/src/test/resources/expected_cloud_spec.json @@ -213,7 +213,7 @@ "enum": ["Fail sync", "Re-sync data"], "default": "Fail sync", "order": 3, - "always_show": true + "always_show": tru } } }, diff --git a/airbyte-integrations/connectors/source-mysql/src/test/resources/expected_oss_spec.json b/airbyte-integrations/connectors/source-mysql/src/test/resources/expected_oss_spec.json index 78450b13aabd..a6e180137d99 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/resources/expected_oss_spec.json +++ b/airbyte-integrations/connectors/source-mysql/src/test/resources/expected_oss_spec.json @@ -219,7 +219,7 @@ "enum": ["Fail sync", "Re-sync data"], "default": "Fail sync", "order": 3, - "always_show": true + "always_show": tru } } }, From d00adb3aa3b9bf67ff57ccbd00bcf11c61915526 Mon Sep 17 00:00:00 2001 From: Akash Kulkarni Date: Mon, 19 Feb 2024 16:43:21 -0800 Subject: [PATCH 7/7] Fix specs --- .../src/test-integration/resources/expected_cloud_spec.json | 3 ++- .../src/test-integration/resources/expected_oss_spec.json | 3 ++- .../source-mysql/src/test/resources/expected_cloud_spec.json | 2 +- .../source-mysql/src/test/resources/expected_oss_spec.json | 2 +- 4 files changed, 6 insertions(+), 4 deletions(-) diff --git a/airbyte-integrations/connectors/source-mysql/src/test-integration/resources/expected_cloud_spec.json b/airbyte-integrations/connectors/source-mysql/src/test-integration/resources/expected_cloud_spec.json index 73a9c1485b81..871b7c0c38bb 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test-integration/resources/expected_cloud_spec.json +++ b/airbyte-integrations/connectors/source-mysql/src/test-integration/resources/expected_cloud_spec.json @@ -196,7 +196,8 @@ "description": "Determines whether Airbyte should fail or re-sync data in case of an stale/invalid cursor value into the WAL. If 'Fail sync' is chosen, a user will have to manually reset the connection before being able to continue syncing data. If 'Re-sync data' is chosen, Airbyte will automatically trigger a refresh but could lead to higher cloud costs and data loss.", "enum": ["Fail sync", "Re-sync data"], "default": "Fail sync", - "order": 3 + "order": 3, + "always_show": true } } }, diff --git a/airbyte-integrations/connectors/source-mysql/src/test-integration/resources/expected_oss_spec.json b/airbyte-integrations/connectors/source-mysql/src/test-integration/resources/expected_oss_spec.json index e29c8621cb92..7ffbbad5f718 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test-integration/resources/expected_oss_spec.json +++ b/airbyte-integrations/connectors/source-mysql/src/test-integration/resources/expected_oss_spec.json @@ -218,7 +218,8 @@ "description": "Determines whether Airbyte should fail or re-sync data in case of an stale/invalid cursor value into the WAL. If 'Fail sync' is chosen, a user will have to manually reset the connection before being able to continue syncing data. If 'Re-sync data' is chosen, Airbyte will automatically trigger a refresh but could lead to higher cloud costs and data loss.", "enum": ["Fail sync", "Re-sync data"], "default": "Fail sync", - "order": 3 + "order": 3, + "always_show": true } } }, diff --git a/airbyte-integrations/connectors/source-mysql/src/test/resources/expected_cloud_spec.json b/airbyte-integrations/connectors/source-mysql/src/test/resources/expected_cloud_spec.json index 4b16b191b592..66f0b3bdf647 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/resources/expected_cloud_spec.json +++ b/airbyte-integrations/connectors/source-mysql/src/test/resources/expected_cloud_spec.json @@ -213,7 +213,7 @@ "enum": ["Fail sync", "Re-sync data"], "default": "Fail sync", "order": 3, - "always_show": tru + "always_show": true } } }, diff --git a/airbyte-integrations/connectors/source-mysql/src/test/resources/expected_oss_spec.json b/airbyte-integrations/connectors/source-mysql/src/test/resources/expected_oss_spec.json index a6e180137d99..78450b13aabd 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/resources/expected_oss_spec.json +++ b/airbyte-integrations/connectors/source-mysql/src/test/resources/expected_oss_spec.json @@ -219,7 +219,7 @@ "enum": ["Fail sync", "Re-sync data"], "default": "Fail sync", "order": 3, - "always_show": tru + "always_show": true } } },