From eb2e0b731eb08e537212673926a9015243420842 Mon Sep 17 00:00:00 2001 From: Akash Kulkarni <113392464+akashkulk@users.noreply.github.com> Date: Tue, 20 Feb 2024 14:46:44 -0800 Subject: [PATCH] [Source-Mongodb] : Add config to throw an error on invalid CDC position (#35375) --- .../integration_tests/expected_spec.json | 9 +++++ .../source-mongodb-v2/metadata.yaml | 2 +- .../source/mongodb/MongoConstants.java | 4 ++ .../source/mongodb/MongoDbSourceConfig.java | 11 +++++ .../mongodb/cdc/MongoDbCdcInitializer.java | 5 +++ .../src/main/resources/spec.json | 9 +++++ .../mongodb/MongoDbSourceAcceptanceTest.java | 11 ++--- .../cdc/MongoDbCdcInitializerTest.java | 40 ++++++++++++++++++- docs/integrations/sources/mongodb-v2.md | 3 +- 9 files changed, 82 insertions(+), 12 deletions(-) diff --git a/airbyte-integrations/connectors/source-mongodb-v2/integration_tests/expected_spec.json b/airbyte-integrations/connectors/source-mongodb-v2/integration_tests/expected_spec.json index e77e9d632780..bd7f8b04829e 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/integration_tests/expected_spec.json +++ b/airbyte-integrations/connectors/source-mongodb-v2/integration_tests/expected_spec.json @@ -167,6 +167,15 @@ "minimum": 10, "maximum": 100000, "group": "advanced" + }, + "invalid_cdc_cursor_position_behavior": { + "type": "string", + "title": "Invalid CDC position behavior (Advanced)", + "description": "Determines whether Airbyte should fail or re-sync data in case of an stale/invalid cursor value into the WAL. If 'Fail sync' is chosen, a user will have to manually reset the connection before being able to continue syncing data. If 'Re-sync data' is chosen, Airbyte will automatically trigger a refresh but could lead to higher cloud costs and data loss.", + "enum": ["Fail sync", "Re-sync data"], + "default": "Fail sync", + "order": 11, + "group": "advanced" } }, "groups": [ diff --git a/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml b/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml index cde2b8488af3..6d9c000b4537 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml +++ b/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml @@ -5,7 +5,7 @@ data: connectorSubtype: database connectorType: source definitionId: b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e - dockerImageTag: 1.2.10 + dockerImageTag: 1.2.11 dockerRepository: airbyte/source-mongodb-v2 documentationUrl: https://docs.airbyte.com/integrations/sources/mongodb-v2 githubIssueLabel: source-mongodb-v2 diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoConstants.java b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoConstants.java index 430fa9f9c409..6fb2bc792b19 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoConstants.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoConstants.java @@ -37,6 +37,10 @@ public class MongoConstants { public static final String INITIAL_RECORD_WAITING_TIME_SEC = "initial_waiting_seconds"; public static final Integer DEFAULT_INITIAL_RECORD_WAITING_TIME_SEC = 300; + public static final String INVALID_CDC_CURSOR_POSITION_PROPERTY = "invalid_cdc_cursor_position_behavior"; + public static final String FAIL_SYNC_OPTION = "Fail sync"; + public static final String RESYNC_DATA_OPTION = "Re-sync data"; + private MongoConstants() {} } diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoDbSourceConfig.java b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoDbSourceConfig.java index a03647bb9386..8f3f572afad2 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoDbSourceConfig.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoDbSourceConfig.java @@ -14,7 +14,9 @@ import static io.airbyte.integrations.source.mongodb.MongoConstants.DEFAULT_INITIAL_RECORD_WAITING_TIME_SEC; import static io.airbyte.integrations.source.mongodb.MongoConstants.DISCOVER_SAMPLE_SIZE_CONFIGURATION_KEY; import static io.airbyte.integrations.source.mongodb.MongoConstants.INITIAL_RECORD_WAITING_TIME_SEC; +import static io.airbyte.integrations.source.mongodb.MongoConstants.INVALID_CDC_CURSOR_POSITION_PROPERTY; import static io.airbyte.integrations.source.mongodb.MongoConstants.PASSWORD_CONFIGURATION_KEY; +import static io.airbyte.integrations.source.mongodb.MongoConstants.RESYNC_DATA_OPTION; import static io.airbyte.integrations.source.mongodb.MongoConstants.SCHEMA_ENFORCED_CONFIGURATION_KEY; import static io.airbyte.integrations.source.mongodb.MongoConstants.USERNAME_CONFIGURATION_KEY; @@ -96,4 +98,13 @@ public Integer getInitialWaitingTimeSeconds() { } } + public boolean shouldFailSyncOnInvalidCursor() { + if (rawConfig.has(INVALID_CDC_CURSOR_POSITION_PROPERTY) + && rawConfig.get(INVALID_CDC_CURSOR_POSITION_PROPERTY).asText().equals(RESYNC_DATA_OPTION)) { + return false; + } else { + return true; + } + } + } diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java index 8b73e6ab3f40..279eb053f3f2 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java @@ -12,6 +12,7 @@ import com.mongodb.client.MongoDatabase; import io.airbyte.cdk.integrations.base.AirbyteTraceMessageUtility; import io.airbyte.cdk.integrations.debezium.AirbyteDebeziumHandler; +import io.airbyte.commons.exceptions.ConfigErrorException; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.util.AutoCloseableIterator; import io.airbyte.commons.util.AutoCloseableIterators; @@ -116,6 +117,10 @@ public List> createCdcIterators( if (!savedOffsetIsValid) { AirbyteTraceMessageUtility.emitAnalyticsTrace(cdcCursorInvalidMessage()); + if (config.shouldFailSyncOnInvalidCursor()) { + throw new ConfigErrorException( + "Saved offset is not valid. Please reset the connection, and then increase oplog retention or reduce sync frequency to prevent his from happening in the future. See https://docs.airbyte.com/integrations/sources/mongodb-v2#mongodb-oplog-and-change-streams for more details"); + } LOGGER.info("Saved offset is not valid. Airbyte will trigger a full refresh."); // If the offset in the state is invalid, reset the state to the initial STATE stateManager.resetState(new MongoDbCdcState(initialDebeziumState, config.getEnforceSchema())); diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/main/resources/spec.json b/airbyte-integrations/connectors/source-mongodb-v2/src/main/resources/spec.json index 9c4af4f046c9..07a7268b7158 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/main/resources/spec.json @@ -167,6 +167,15 @@ "minimum": 10, "maximum": 100000, "group": "advanced" + }, + "invalid_cdc_cursor_position_behavior": { + "type": "string", + "title": "Invalid CDC position behavior (Advanced)", + "description": "Determines whether Airbyte should fail or re-sync data in case of an stale/invalid cursor value into the WAL. If 'Fail sync' is chosen, a user will have to manually reset the connection before being able to continue syncing data. If 'Re-sync data' is chosen, Airbyte will automatically trigger a refresh but could lead to higher cloud costs and data loss.", + "enum": ["Fail sync", "Re-sync data"], + "default": "Fail sync", + "order": 11, + "group": "advanced" } }, "groups": [ diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/test-integration/java/io/airbyte/integrations/source/mongodb/MongoDbSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mongodb-v2/src/test-integration/java/io/airbyte/integrations/source/mongodb/MongoDbSourceAcceptanceTest.java index fa220e4386db..5b7703b85c81 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/test-integration/java/io/airbyte/integrations/source/mongodb/MongoDbSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/test-integration/java/io/airbyte/integrations/source/mongodb/MongoDbSourceAcceptanceTest.java @@ -9,6 +9,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -508,14 +509,8 @@ void testSyncShouldHandlePurgedLogsGracefully() throws Exception { stateMessage.getGlobal().setSharedState(Jsons.jsonNode(cdcState)); final JsonNode state = Jsons.jsonNode(List.of(stateMessage)); - // Re-run the sync to prove that an initial snapshot is initiated due to invalid resume token - final List messages2 = runRead(configuredCatalog, state); - - final List recordMessages2 = filterRecords(messages2); - final List stateMessages2 = filterStateMessages(messages2); - - assertEquals(recordCount, recordMessages2.size()); - assertEquals(recordCount + 1, stateMessages2.size()); + // Re-run the sync to prove that a config error is thrown due to invalid resume token + assertThrows(Exception.class, () -> runRead(configuredCatalog, state)); } @Test diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializerTest.java b/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializerTest.java index 9b87de23c1a3..7e0ea6eaa222 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializerTest.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializerTest.java @@ -5,6 +5,8 @@ package io.airbyte.integrations.source.mongodb.cdc; import static io.airbyte.integrations.source.mongodb.MongoConstants.DATABASE_CONFIG_CONFIGURATION_KEY; +import static io.airbyte.integrations.source.mongodb.MongoConstants.INVALID_CDC_CURSOR_POSITION_PROPERTY; +import static io.airbyte.integrations.source.mongodb.MongoConstants.RESYNC_DATA_OPTION; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -20,6 +22,7 @@ import static org.mockito.Mockito.when; import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.ImmutableMap; import com.mongodb.MongoCommandException; import com.mongodb.ServerAddress; import com.mongodb.client.AggregateIterable; @@ -205,20 +208,53 @@ void testCreateCdcIteratorsFromInitialStateWithCompletedInitialSnapshot() { } @Test - void testCreateCdcIteratorsWithCompletedInitialSnapshotSavedOffsetInvalid() { + void testCreateCdcIteratorsWithCompletedInitialSnapshotSavedOffsetInvalidDefaultBehavior() { + when(changeStreamIterable.cursor()) + .thenReturn(mongoChangeStreamCursor) + .thenThrow(new MongoCommandException(new BsonDocument(), new ServerAddress())) + .thenReturn(mongoChangeStreamCursor); + final MongoDbStateManager stateManager = MongoDbStateManager.createStateManager(createInitialDebeziumState(InitialSnapshotStatus.COMPLETE)); + assertThrows(ConfigErrorException.class, () -> cdcInitializer.createCdcIterators(mongoClient, cdcConnectorMetadataInjector, CONFIGURED_CATALOG, + stateManager, EMITTED_AT, CONFIG)); + } + + @Test + void testCreateCdcIteratorsWithCompletedInitialSnapshotSavedOffsetFailOption() { + when(changeStreamIterable.cursor()) + .thenReturn(mongoChangeStreamCursor) + .thenThrow(new MongoCommandException(new BsonDocument(), new ServerAddress())) + .thenReturn(mongoChangeStreamCursor); + final MongoDbStateManager stateManager = MongoDbStateManager.createStateManager(createInitialDebeziumState(InitialSnapshotStatus.COMPLETE)); + assertThrows(ConfigErrorException.class, () -> cdcInitializer.createCdcIterators(mongoClient, cdcConnectorMetadataInjector, CONFIGURED_CATALOG, + stateManager, EMITTED_AT, CONFIG)); + } + + @Test + void testCreateCdcIteratorsWithCompletedInitialSnapshotSavedOffsetInvalidResyncOption() { + MongoDbSourceConfig resyncConfig = new MongoDbSourceConfig(createConfig(RESYNC_DATA_OPTION)); when(changeStreamIterable.cursor()) .thenReturn(mongoChangeStreamCursor) .thenThrow(new MongoCommandException(new BsonDocument(), new ServerAddress())) .thenReturn(mongoChangeStreamCursor); final MongoDbStateManager stateManager = MongoDbStateManager.createStateManager(createInitialDebeziumState(InitialSnapshotStatus.COMPLETE)); final List> iterators = cdcInitializer - .createCdcIterators(mongoClient, cdcConnectorMetadataInjector, CONFIGURED_CATALOG, stateManager, EMITTED_AT, CONFIG); + .createCdcIterators(mongoClient, cdcConnectorMetadataInjector, CONFIGURED_CATALOG, stateManager, EMITTED_AT, resyncConfig); assertNotNull(iterators); assertEquals(2, iterators.size(), "Should always have 2 iterators: 1 for the initial snapshot and 1 for the cdc stream"); assertTrue(iterators.get(0).hasNext(), "Initial snapshot iterator should at least have one message if its snapshot state is set as complete but needs to start over due to invalid saved offset"); } + JsonNode createConfig(String cdcCursorFailBehaviour) { + return Jsons.jsonNode(ImmutableMap.builder() + .put(DATABASE_CONFIG_CONFIGURATION_KEY, + Map.of( + MongoDbDebeziumConstants.Configuration.CONNECTION_STRING_CONFIGURATION_KEY, "mongodb://host:12345/", + MongoDbDebeziumConstants.Configuration.DATABASE_CONFIGURATION_KEY, DATABASE)) + .put(INVALID_CDC_CURSOR_POSITION_PROPERTY, cdcCursorFailBehaviour) + .build()); + } + @Test void testUnableToExtractOffsetFromStateException() { final MongoDbStateManager stateManager = MongoDbStateManager.createStateManager(createInitialDebeziumState(InitialSnapshotStatus.COMPLETE)); diff --git a/docs/integrations/sources/mongodb-v2.md b/docs/integrations/sources/mongodb-v2.md index 4515d39997fb..0df7d0f12a97 100644 --- a/docs/integrations/sources/mongodb-v2.md +++ b/docs/integrations/sources/mongodb-v2.md @@ -214,7 +214,8 @@ For more information regarding configuration parameters, please see [MongoDb Doc | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:----------------------------------------------------------------------------------------------------------| -| 1.2.10 | 2024-02-13 | [35036](https://github.com/airbytehq/airbyte/pull/34751) | Emit analytics message for invalid CDC cursor. | +| 1.2.11 | 2024-02-20 | [35375](https://github.com/airbytehq/airbyte/pull/35375) | Add config to throw an error on invalid CDC position and enable it by default. | +| 1.2.10 | 2024-02-13 | [35036](https://github.com/airbytehq/airbyte/pull/34751) | Emit analytics message for invalid CDC cursor. | | 1.2.9 | 2024-02-13 | [35114](https://github.com/airbytehq/airbyte/pull/35114) | Extend subsequent cdc record wait time to the duration of initial. Bug Fixes | | 1.2.8 | 2024-02-08 | [34748](https://github.com/airbytehq/airbyte/pull/34748) | Adopt CDK 0.19.0 | | 1.2.7 | 2024-02-01 | [34759](https://github.com/airbytehq/airbyte/pull/34759) | Fail sync if initial snapshot for any stream fails. |