Skip to content

Commit

Permalink
[Source-mysql] : Add config to throw an error on invalid CDC position (
Browse files Browse the repository at this point in the history
  • Loading branch information
akashkulk authored Feb 20, 2024
1 parent d8bae3d commit 40a5edc
Show file tree
Hide file tree
Showing 12 changed files with 99 additions and 1 deletion.
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerImageTag: 3.3.7
dockerImageTag: 3.3.8
dockerRepository: airbyte/source-mysql
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
githubIssueLabel: source-mysql
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.mysql;

// Constants defined in
// airbyte-integrations/connectors/source-postgres/src/main/resources/spec.json.
public class MySqlSpecConstants {

public static final String INVALID_CDC_CURSOR_POSITION_PROPERTY = "invalid_cdc_cursor_position_behavior";
public static final String FAIL_SYNC_OPTION = "Fail sync";
public static final String RESYNC_DATA_OPTION = "Re-sync data";

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import static io.airbyte.cdk.db.DbAnalyticsUtils.cdcCursorInvalidMessage;
import static io.airbyte.integrations.source.mysql.MySqlQueryUtils.getTableSizeInfoForStreams;
import static io.airbyte.integrations.source.mysql.MySqlQueryUtils.prettyPrintConfiguredAirbyteStreamList;
import static io.airbyte.integrations.source.mysql.MySqlSpecConstants.FAIL_SYNC_OPTION;
import static io.airbyte.integrations.source.mysql.MySqlSpecConstants.INVALID_CDC_CURSOR_POSITION_PROPERTY;
import static io.airbyte.integrations.source.mysql.cdc.MysqlCdcStateConstants.MYSQL_CDC_OFFSET;
import static io.airbyte.integrations.source.mysql.initialsync.MySqlInitialLoadGlobalStateManager.STATE_TYPE_KEY;
import static io.airbyte.integrations.source.mysql.initialsync.MySqlInitialLoadStateManager.PRIMARY_KEY_STATE_TYPE;
Expand All @@ -25,6 +27,7 @@
import io.airbyte.cdk.integrations.source.relationaldb.TableInfo;
import io.airbyte.cdk.integrations.source.relationaldb.models.CdcState;
import io.airbyte.cdk.integrations.source.relationaldb.state.StateManager;
import io.airbyte.commons.exceptions.ConfigErrorException;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.commons.util.AutoCloseableIterators;
Expand Down Expand Up @@ -111,6 +114,11 @@ public static List<AutoCloseableIterator<AirbyteMessage>> getCdcReadIterators(fi

if (!savedOffsetStillPresentOnServer) {
AirbyteTraceMessageUtility.emitAnalyticsTrace(cdcCursorInvalidMessage());
if (!sourceConfig.get("replication_method").has(INVALID_CDC_CURSOR_POSITION_PROPERTY) || sourceConfig.get("replication_method").get(
INVALID_CDC_CURSOR_POSITION_PROPERTY).asText().equals(FAIL_SYNC_OPTION)) {
throw new ConfigErrorException(
"Saved offset no longer present on the server. Please reset the connection, and then increase binlog retention or reduce sync frequency. See https://docs.airbyte.com/integrations/sources/mysql/mysql-troubleshooting#under-cdc-incremental-mode-there-are-still-full-refresh-syncs for more details.");
}
LOGGER.warn("Saved offset no longer present on the server, Airbyte is going to trigger a sync from scratch");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,15 @@
"description": "Enter the configured MySQL server timezone. This should only be done if the configured timezone in your MySQL instance does not conform to IANNA standard.",
"order": 2,
"always_show": true
},
"invalid_cdc_cursor_position_behavior": {
"type": "string",
"title": "Invalid CDC position behavior (Advanced)",
"description": "Determines whether Airbyte should fail or re-sync data in case of an stale/invalid cursor value into the WAL. If 'Fail sync' is chosen, a user will have to manually reset the connection before being able to continue syncing data. If 'Re-sync data' is chosen, Airbyte will automatically trigger a refresh but could lead to higher cloud costs and data loss.",
"enum": ["Fail sync", "Re-sync data"],
"default": "Fail sync",
"order": 3,
"always_show": true
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,15 @@
"description": "Enter the configured MySQL server timezone. This should only be done if the configured timezone in your MySQL instance does not conform to IANNA standard.",
"order": 2,
"always_show": true
},
"invalid_cdc_cursor_position_behavior": {
"type": "string",
"title": "Invalid CDC position behavior (Advanced)",
"description": "Determines whether Airbyte should fail or re-sync data in case of an stale/invalid cursor value into the WAL. If 'Fail sync' is chosen, a user will have to manually reset the connection before being able to continue syncing data. If 'Re-sync data' is chosen, Airbyte will automatically trigger a refresh but could lead to higher cloud costs and data loss.",
"enum": ["Fail sync", "Re-sync data"],
"default": "Fail sync",
"order": 3,
"always_show": true
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,15 @@
"description": "Enter the configured MySQL server timezone. This should only be done if the configured timezone in your MySQL instance does not conform to IANNA standard.",
"order": 2,
"always_show": true
},
"invalid_cdc_cursor_position_behavior": {
"type": "string",
"title": "Invalid CDC position behavior (Advanced)",
"description": "Determines whether Airbyte should fail or re-sync data in case of an stale/invalid cursor value into the WAL. If 'Fail sync' is chosen, a user will have to manually reset the connection before being able to continue syncing data. If 'Re-sync data' is chosen, Airbyte will automatically trigger a refresh but could lead to higher cloud costs and data loss.",
"enum": ["Fail sync", "Re-sync data"],
"default": "Fail sync",
"order": 3,
"always_show": true
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import static io.airbyte.integrations.source.mysql.MySqlSource.CDC_DEFAULT_CURSOR;
import static io.airbyte.integrations.source.mysql.MySqlSource.CDC_LOG_FILE;
import static io.airbyte.integrations.source.mysql.MySqlSource.CDC_LOG_POS;
import static io.airbyte.integrations.source.mysql.MySqlSpecConstants.FAIL_SYNC_OPTION;
import static io.airbyte.integrations.source.mysql.cdc.MysqlCdcStateConstants.IS_COMPRESSED;
import static io.airbyte.integrations.source.mysql.cdc.MysqlCdcStateConstants.MYSQL_CDC_OFFSET;
import static io.airbyte.integrations.source.mysql.cdc.MysqlCdcStateConstants.MYSQL_DB_HISTORY;
Expand All @@ -20,6 +21,7 @@
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.fasterxml.jackson.databind.JsonNode;
Expand All @@ -33,6 +35,7 @@
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.integrations.debezium.CdcSourceTest;
import io.airbyte.cdk.integrations.debezium.internals.AirbyteSchemaHistoryStorage;
import io.airbyte.commons.exceptions.ConfigErrorException;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.commons.util.AutoCloseableIterators;
Expand Down Expand Up @@ -277,6 +280,12 @@ protected void syncShouldHandlePurgedLogsGracefully() throws Exception {
dataFromSecondBatch);
assertEquals((recordsToCreate * 2) + recordsCreatedBeforeTestCount, recordsFromSecondBatch.size(),
"Expected 46 records to be replicated in the second sync.");

JsonNode failSyncConfig = testdb.testConfigBuilder()
.withCdcReplication(FAIL_SYNC_OPTION)
.with(SYNC_CHECKPOINT_RECORDS_PROPERTY, 1)
.build();
assertThrows(ConfigErrorException.class, () -> source().read(failSyncConfig, getConfiguredCatalog(), state));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,15 @@
"description": "Enter the configured MySQL server timezone. This should only be done if the configured timezone in your MySQL instance does not conform to IANNA standard.",
"order": 2,
"always_show": true
},
"invalid_cdc_cursor_position_behavior": {
"type": "string",
"title": "Invalid CDC position behavior (Advanced)",
"description": "Determines whether Airbyte should fail or re-sync data in case of an stale/invalid cursor value into the WAL. If 'Fail sync' is chosen, a user will have to manually reset the connection before being able to continue syncing data. If 'Re-sync data' is chosen, Airbyte will automatically trigger a refresh but could lead to higher cloud costs and data loss.",
"enum": ["Fail sync", "Re-sync data"],
"default": "Fail sync",
"order": 3,
"always_show": true
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,15 @@
"description": "Enter the configured MySQL server timezone. This should only be done if the configured timezone in your MySQL instance does not conform to IANNA standard.",
"order": 2,
"always_show": true
},
"invalid_cdc_cursor_position_behavior": {
"type": "string",
"title": "Invalid CDC position behavior (Advanced)",
"description": "Determines whether Airbyte should fail or re-sync data in case of an stale/invalid cursor value into the WAL. If 'Fail sync' is chosen, a user will have to manually reset the connection before being able to continue syncing data. If 'Re-sync data' is chosen, Airbyte will automatically trigger a refresh but could lead to higher cloud costs and data loss.",
"enum": ["Fail sync", "Re-sync data"],
"default": "Fail sync",
"order": 3,
"always_show": true
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

package io.airbyte.integrations.source.mysql;

import static io.airbyte.integrations.source.mysql.MySqlSpecConstants.INVALID_CDC_CURSOR_POSITION_PROPERTY;
import static io.airbyte.integrations.source.mysql.MySqlSpecConstants.RESYNC_DATA_OPTION;

import com.google.common.collect.ImmutableMap;
import io.airbyte.cdk.db.factory.DatabaseDriver;
import io.airbyte.cdk.testutils.TestDatabase;
Expand Down Expand Up @@ -128,12 +131,17 @@ public MySQLConfigBuilder withStandardReplication() {
}

public MySQLConfigBuilder withCdcReplication() {
return withCdcReplication(RESYNC_DATA_OPTION);
}

public MySQLConfigBuilder withCdcReplication(String cdcCursorFailBehaviour) {
return this
.with("is_test", true)
.with("replication_method", ImmutableMap.builder()
.put("method", "CDC")
.put("initial_waiting_seconds", 5)
.put("server_time_zone", "America/Los_Angeles")
.put(INVALID_CDC_CURSOR_POSITION_PROPERTY, cdcCursorFailBehaviour)
.build());
}

Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ Any database or table encoding combination of charset and collation is supported

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------|
| 3.3.8 | 2024-02-20 | [35338](https://github.com/airbytehq/airbyte/pull/35338) | Add config to throw an error on invalid CDC position. |
| 3.3.7 | 2024-02-13 | [35036](https://github.com/airbytehq/airbyte/pull/34751) | Emit analytics message for invalid CDC cursor. |
| 3.3.6 | 2024-02-13 | [34869](https://github.com/airbytehq/airbyte/pull/34573) | Don't emit state in SourceStateIterator when there is an underlying stream failure. |
| 3.3.5 | 2024-02-12 | [34580](https://github.com/airbytehq/airbyte/pull/34580) | Support special chars in db name |
Expand Down
12 changes: 12 additions & 0 deletions docs/integrations/sources/mysql/mysql-troubleshooting.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,18 @@
* Amazon RDS MySQL or MariaDB connection issues: If you see the following `Cannot create a PoolableConnectionFactory` error, please add `enabledTLSProtocols=TLSv1.2` in the JDBC parameters.
* Amazon RDS MySQL connection issues: If you see `Error: HikariPool-1 - Connection is not available, request timed out after 30001ms.`, many times this due to your VPC not allowing public traffic. We recommend going through [this AWS troubleshooting checklist](https://aws.amazon.com/premiumsupport/knowledge-center/rds-cannot-connect/) to ensure the correct permissions/settings have been granted to allow Airbyte to connect to your database.

### Under CDC incremental mode, there are still full refresh syncs

Normally under the CDC mode, the MySQL source will first run a full refresh sync to read the snapshot of all the existing data, and all subsequent runs will only be incremental syncs reading from the binlogs. However, occasionally, you may see full refresh syncs after the initial run. When this happens, you will see the following log:

> Saved offset no longer present on the server, Airbyte is going to trigger a sync from scratch
The root causes is that the binglogs needed for the incremental sync have been removed by MySQL. This can occur under the following scenarios:

- When there are lots of database updates resulting in more WAL files than allowed in the `pg_wal` directory, Postgres will purge or archive the WAL files. This scenario is preventable. Possible solutions include:
- Sync the data source more frequently.
- Set a higher `binlog_expire_logs_seconds`. It's recommended to set this value to a time period of 7 days. See detailed documentation [here](https://dev.mysql.com/doc/refman/8.0/en/replication-options-binary-log.html#sysvar_binlog_expire_logs_seconds). The downside of this approach is that more disk space will be needed.

### EventDataDeserializationException errors during initial snapshot

When a sync runs for the first time using CDC, Airbyte performs an initial consistent snapshot of your database. Airbyte doesn't acquire any table locks \(for tables defined with MyISAM engine, the tables would still be locked\) while creating the snapshot to allow writes by other database clients. But in order for the sync to work without any error/unexpected behaviour, it is assumed that no schema changes are happening while the snapshot is running.
Expand Down

0 comments on commit 40a5edc

Please sign in to comment.