diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties index 663e5832f1db..829ba2e6d14f 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties @@ -1 +1 @@ -version=0.36.2 \ No newline at end of file +version=0.36.3 \ No newline at end of file diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/AbstractJdbcSource.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/AbstractJdbcSource.kt index 50ed17d71eec..3615f0225546 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/AbstractJdbcSource.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/AbstractJdbcSource.kt @@ -55,8 +55,11 @@ import io.airbyte.commons.util.AutoCloseableIterator import io.airbyte.commons.util.AutoCloseableIterators import io.airbyte.protocol.models.CommonField import io.airbyte.protocol.models.JsonSchemaType +import io.airbyte.protocol.models.v0.AirbyteCatalog import io.airbyte.protocol.models.v0.AirbyteMessage +import io.airbyte.protocol.models.v0.AirbyteStream import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair +import io.airbyte.protocol.models.v0.CatalogHelpers import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream import io.airbyte.protocol.models.v0.SyncMode @@ -106,6 +109,21 @@ abstract class AbstractJdbcSource( return false } + override fun discover(config: JsonNode): AirbyteCatalog { + var catalog = super.discover(config) + var database = createDatabase(config) + catalog.streams.forEach( + Consumer { stream: AirbyteStream -> + stream.isResumable = + supportResumableFullRefresh( + database, + CatalogHelpers.toDefaultConfiguredStream(stream) + ) + } + ) + return catalog + } + open fun getInitialLoadHandler( database: JdbcDatabase, airbyteStream: ConfiguredAirbyteStream, diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/jdbc/DefaultJdbcSourceAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/jdbc/DefaultJdbcSourceAcceptanceTest.kt index dc8086bafbbe..f8ab9ed8846c 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/jdbc/DefaultJdbcSourceAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/jdbc/DefaultJdbcSourceAcceptanceTest.kt @@ -21,6 +21,7 @@ import io.airbyte.commons.json.Jsons import io.airbyte.commons.util.MoreIterators import io.airbyte.protocol.models.Field import io.airbyte.protocol.models.JsonSchemaType +import io.airbyte.protocol.models.v0.AirbyteCatalog import io.airbyte.protocol.models.v0.AirbyteMessage import io.airbyte.protocol.models.v0.AirbyteStateMessage import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage @@ -67,6 +68,60 @@ internal class DefaultJdbcSourceAcceptanceTest : return true } + // Default test source does not support RFR. + public override fun supportResumeableFullRefreshWithoutPk(): Boolean? { + return false + } + + override fun getCatalog(defaultNamespace: String?): AirbyteCatalog { + return AirbyteCatalog() + .withStreams( + mutableListOf( + CatalogHelpers.createAirbyteStream( + TABLE_NAME, + defaultNamespace, + Field.of(COL_ID, JsonSchemaType.INTEGER), + Field.of(COL_NAME, JsonSchemaType.STRING), + Field.of(COL_UPDATED_AT, JsonSchemaType.STRING), + ) + .withSupportedSyncModes( + java.util.List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL), + ) + .withSourceDefinedPrimaryKey(java.util.List.of(java.util.List.of(COL_ID))) + .withIsResumable(false), + CatalogHelpers.createAirbyteStream( + TABLE_NAME_WITHOUT_PK, + defaultNamespace, + Field.of(COL_ID, JsonSchemaType.INTEGER), + Field.of(COL_NAME, JsonSchemaType.STRING), + Field.of(COL_UPDATED_AT, JsonSchemaType.STRING), + ) + .withSupportedSyncModes( + java.util.List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL), + ) + .withSourceDefinedPrimaryKey(emptyList()) + .withIsResumable(false), + CatalogHelpers.createAirbyteStream( + TABLE_NAME_COMPOSITE_PK, + defaultNamespace, + Field.of(COL_FIRST_NAME, JsonSchemaType.STRING), + Field.of(COL_LAST_NAME, JsonSchemaType.STRING), + Field.of(COL_UPDATED_AT, JsonSchemaType.STRING), + ) + .withSupportedSyncModes( + java.util.List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL), + ) + .withSourceDefinedPrimaryKey( + java.util.List.of( + java.util.List.of(COL_FIRST_NAME), + java.util.List.of(COL_LAST_NAME), + ), + ) + .withIsResumable(false), + ), + ) + } + fun getConfigWithConnectionProperties( psqlDb: PostgreSQLContainer<*>, dbName: String, diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/debezium/CdcSourceTest.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/debezium/CdcSourceTest.kt index d1e181c84e2a..6782df6399e4 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/debezium/CdcSourceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/debezium/CdcSourceTest.kt @@ -101,6 +101,8 @@ abstract class CdcSourceTest> { protected abstract fun addCdcMetadataColumns(stream: AirbyteStream?) + protected abstract fun addIsResumableFlagForNonPkTable(stream: AirbyteStream?) + protected abstract fun addCdcDefaultCursorField(stream: AirbyteStream?) protected abstract fun assertExpectedStateMessages(stateMessages: List) @@ -1457,6 +1459,7 @@ abstract class CdcSourceTest> { val streams = expectedCatalog.streams // stream with PK streams[0].sourceDefinedCursor = true + streams[0].isResumable = true addCdcMetadataColumns(streams[0]) addCdcDefaultCursorField(streams[0]) @@ -1472,6 +1475,7 @@ abstract class CdcSourceTest> { streamWithoutPK.supportedSyncModes = java.util.List.of(SyncMode.FULL_REFRESH) addCdcDefaultCursorField(streamWithoutPK) addCdcMetadataColumns(streamWithoutPK) + addIsResumableFlagForNonPkTable(streamWithoutPK) val randomStream = CatalogHelpers.createAirbyteStream( @@ -1488,6 +1492,7 @@ abstract class CdcSourceTest> { .withSourceDefinedPrimaryKey( java.util.List.of(java.util.List.of(COL_ID + "_random")), ) + .withIsResumable(true) addCdcDefaultCursorField(randomStream) addCdcMetadataColumns(randomStream) diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.kt index 20749a40cdc9..4d682520b278 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.kt @@ -335,6 +335,10 @@ abstract class JdbcSourceAcceptanceTest> { } } + protected open fun supportResumeableFullRefreshWithoutPk(): Boolean? { + return false + } + @Test @Throws(Exception::class) protected fun testDiscoverWithMultipleSchemas() { @@ -379,7 +383,8 @@ abstract class JdbcSourceAcceptanceTest> { ) .withSupportedSyncModes( java.util.List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL), - ), + ) + .withIsResumable(supportResumeableFullRefreshWithoutPk()), ) expected.streams = catalogStreams // sort streams by name so that we are comparing lists with the same order. diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/build.gradle b/airbyte-cdk/java/airbyte-cdk/dependencies/build.gradle index 8f527f3fe41e..428d19774c9f 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/build.gradle +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/build.gradle @@ -18,7 +18,7 @@ dependencies { api 'com.fasterxml.jackson.module:jackson-module-kotlin' api 'com.google.guava:guava:33.0.0-jre' api 'commons-io:commons-io:2.15.1' - api ('io.airbyte.airbyte-protocol:protocol-models:0.9.0') { exclude group: 'com.google.api-client', module: 'google-api-client' } + api ('io.airbyte.airbyte-protocol:protocol-models:0.11.0') { exclude group: 'com.google.api-client', module: 'google-api-client' } api 'javax.annotation:javax.annotation-api:1.3.2' api 'org.apache.commons:commons-compress:1.25.0' api 'org.apache.commons:commons-lang3:3.14.0' diff --git a/airbyte-integrations/connectors/source-mongodb-v2/build.gradle b/airbyte-integrations/connectors/source-mongodb-v2/build.gradle index 1c668bc22b49..f027ee29f6ca 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/build.gradle +++ b/airbyte-integrations/connectors/source-mongodb-v2/build.gradle @@ -3,7 +3,7 @@ plugins { } airbyteJavaConnector { - cdkVersionRequired = '0.31.5' + cdkVersionRequired = '0.36.3' features = ['db-sources', 'datastore-mongo'] useLocalCdk = false } diff --git a/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml b/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml index e24f66f36762..256fd3cc2fb8 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml +++ b/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml @@ -8,7 +8,7 @@ data: connectorSubtype: database connectorType: source definitionId: b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e - dockerImageTag: 1.3.13 + dockerImageTag: 1.3.14 dockerRepository: airbyte/source-mongodb-v2 documentationUrl: https://docs.airbyte.com/integrations/sources/mongodb-v2 githubIssueLabel: source-mongodb-v2 diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoUtil.java b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoUtil.java index dae57c9052d4..b2bffab3cbd3 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoUtil.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoUtil.java @@ -140,6 +140,7 @@ public static List getAirbyteStreams(final MongoClient mongoClien .map(collectionName -> discoverFields(collectionName, mongoClient, databaseName, sampleSize, isSchemaEnforced)) .filter(Optional::isPresent) .map(Optional::get) + .map(stream -> stream.withIsResumable(true)) .collect(Collectors.toList()); } diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/MongoDbSourceTest.java b/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/MongoDbSourceTest.java index 76cf614d04d9..b1bc2f484c2a 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/MongoDbSourceTest.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/MongoDbSourceTest.java @@ -229,6 +229,7 @@ void testDiscoverOperation() throws IOException { assertEquals(List.of(DEFAULT_CURSOR_FIELD), stream.get().getDefaultCursorField()); assertEquals(List.of(List.of(MongoCatalogHelper.DEFAULT_PRIMARY_KEY)), stream.get().getSourceDefinedPrimaryKey()); assertEquals(MongoCatalogHelper.SUPPORTED_SYNC_MODES, stream.get().getSupportedSyncModes()); + assertEquals(true, stream.get().getIsResumable()); } @Test diff --git a/airbyte-integrations/connectors/source-mssql/build.gradle b/airbyte-integrations/connectors/source-mssql/build.gradle index 779f91f0018c..3d8286357a7e 100644 --- a/airbyte-integrations/connectors/source-mssql/build.gradle +++ b/airbyte-integrations/connectors/source-mssql/build.gradle @@ -3,7 +3,7 @@ plugins { } airbyteJavaConnector { - cdkVersionRequired = '0.36.2' + cdkVersionRequired = '0.36.3' features = ['db-sources'] useLocalCdk = false } diff --git a/airbyte-integrations/connectors/source-mssql/metadata.yaml b/airbyte-integrations/connectors/source-mssql/metadata.yaml index f4d2e0b263e5..f47fbae81d09 100644 --- a/airbyte-integrations/connectors/source-mssql/metadata.yaml +++ b/airbyte-integrations/connectors/source-mssql/metadata.yaml @@ -9,7 +9,7 @@ data: connectorSubtype: database connectorType: source definitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1 - dockerImageTag: 4.0.26 + dockerImageTag: 4.0.27 dockerRepository: airbyte/source-mssql documentationUrl: https://docs.airbyte.com/integrations/sources/mssql githubIssueLabel: source-mssql diff --git a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java index 20c75afb8832..9ac42c0eaabc 100644 --- a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java +++ b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java @@ -216,7 +216,7 @@ public Set getExcludedInternalNameSpaces() { } @Override - public AirbyteCatalog discover(final JsonNode config) throws Exception { + public AirbyteCatalog discover(final JsonNode config) { final AirbyteCatalog catalog = super.discover(config); if (MssqlCdcHelper.isCdc(config)) { diff --git a/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/CdcMssqlSourceTest.java b/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/CdcMssqlSourceTest.java index d258f29c1cff..f7483abea0d6 100644 --- a/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/CdcMssqlSourceTest.java +++ b/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/CdcMssqlSourceTest.java @@ -185,6 +185,11 @@ public void newTableSnapshotTest() { // Do nothing } + @Override + protected void addIsResumableFlagForNonPkTable(final AirbyteStream stream) { + stream.setIsResumable(false); + } + // Utilize the setup to do test on MssqlDebeziumStateUtil. @Test public void testCdcSnapshot() { diff --git a/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/MssqlJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/MssqlJdbcSourceAcceptanceTest.java index fd29d7ab400b..f35ee12c8c65 100644 --- a/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/MssqlJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/MssqlJdbcSourceAcceptanceTest.java @@ -385,7 +385,8 @@ protected AirbyteCatalog getCatalog(final String defaultNamespace) { Field.of(COL_NAME, JsonSchemaType.STRING), Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE)) .withSupportedSyncModes(List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) - .withSourceDefinedPrimaryKey(List.of(List.of(COL_ID))), + .withSourceDefinedPrimaryKey(List.of(List.of(COL_ID))) + .withIsResumable(true), CatalogHelpers.createAirbyteStream( TABLE_NAME_WITHOUT_PK, defaultNamespace, @@ -393,7 +394,8 @@ protected AirbyteCatalog getCatalog(final String defaultNamespace) { Field.of(COL_NAME, JsonSchemaType.STRING), Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE)) .withSupportedSyncModes(List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) - .withSourceDefinedPrimaryKey(Collections.emptyList()), + .withSourceDefinedPrimaryKey(Collections.emptyList()) + .withIsResumable(false), CatalogHelpers.createAirbyteStream( TABLE_NAME_COMPOSITE_PK, defaultNamespace, @@ -402,7 +404,8 @@ protected AirbyteCatalog getCatalog(final String defaultNamespace) { Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE)) .withSupportedSyncModes(List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) .withSourceDefinedPrimaryKey( - List.of(List.of(COL_FIRST_NAME), List.of(COL_LAST_NAME))))); + List.of(List.of(COL_FIRST_NAME), List.of(COL_LAST_NAME))) + .withIsResumable(true))); } @Override diff --git a/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/MssqlSourceTest.java b/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/MssqlSourceTest.java index 098ebab0ef41..38a8b554a1f0 100644 --- a/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/MssqlSourceTest.java +++ b/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/MssqlSourceTest.java @@ -35,7 +35,8 @@ class MssqlSourceTest { Field.of("name", JsonSchemaType.STRING), Field.of("born", JsonSchemaType.STRING_TIMESTAMP_WITH_TIMEZONE)) .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) - .withSourceDefinedPrimaryKey(List.of(List.of("id"))))); + .withSourceDefinedPrimaryKey(List.of(List.of("id"))) + .withIsResumable(true))); private MsSQLTestDatabase testdb; @@ -69,7 +70,7 @@ private JsonNode getConfig() { // the column twice. we now de-duplicate it (pr: https://github.com/airbytehq/airbyte/pull/983). // this tests that this de-duplication is successful. @Test - void testDiscoverWithPk() throws Exception { + void testDiscoverWithPk() { testdb .with("ALTER TABLE id_and_name ADD CONSTRAINT i3pk PRIMARY KEY CLUSTERED (id);") .with("CREATE INDEX i1 ON id_and_name (id);"); @@ -77,6 +78,13 @@ void testDiscoverWithPk() throws Exception { assertEquals(CATALOG, actual); } + @Test + void testDiscoverWithoutPk() { + final AirbyteCatalog actual = source().discover(getConfig()); + assertEquals(STREAM_NAME, actual.getStreams().get(0).getName()); + assertEquals(false, actual.getStreams().get(0).getIsResumable()); + } + @Test @Disabled("See https://github.com/airbytehq/airbyte/pull/23908#issuecomment-1463753684, enable once communication is out") public void testTableWithNullCursorValueShouldThrowException() throws Exception { diff --git a/airbyte-integrations/connectors/source-mysql/build.gradle b/airbyte-integrations/connectors/source-mysql/build.gradle index e325fbf84579..23cce83557d7 100644 --- a/airbyte-integrations/connectors/source-mysql/build.gradle +++ b/airbyte-integrations/connectors/source-mysql/build.gradle @@ -6,7 +6,7 @@ plugins { } airbyteJavaConnector { - cdkVersionRequired = '0.36.2' + cdkVersionRequired = '0.36.3' features = ['db-sources'] useLocalCdk = false } diff --git a/airbyte-integrations/connectors/source-mysql/metadata.yaml b/airbyte-integrations/connectors/source-mysql/metadata.yaml index d199f866c38f..cceab94a08bb 100644 --- a/airbyte-integrations/connectors/source-mysql/metadata.yaml +++ b/airbyte-integrations/connectors/source-mysql/metadata.yaml @@ -9,7 +9,7 @@ data: connectorSubtype: database connectorType: source definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad - dockerImageTag: 3.4.6 + dockerImageTag: 3.4.7 dockerRepository: airbyte/source-mysql documentationUrl: https://docs.airbyte.com/integrations/sources/mysql githubIssueLabel: source-mysql diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java index d4fb037c8e02..217d85bab022 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java @@ -291,7 +291,7 @@ public List> getCheckOperations(final J } @Override - public AirbyteCatalog discover(final JsonNode config) throws Exception { + public AirbyteCatalog discover(final JsonNode config) { final AirbyteCatalog catalog = super.discover(config); if (isCdc(config)) { diff --git a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java index fd397230ad89..365218b49933 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java @@ -245,6 +245,11 @@ protected boolean supportResumableFullRefresh() { return true; } + @Override + protected void addIsResumableFlagForNonPkTable(final AirbyteStream stream) { + stream.setIsResumable(false); + } + @Test protected void syncWithReplicationClientPrivilegeRevokedFailsCheck() throws Exception { testdb.with("REVOKE REPLICATION CLIENT ON *.* FROM %s@'%%';", testdb.getUserName()); diff --git a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlJdbcSourceAcceptanceTest.java index 742994efb05d..d1ec0aacb680 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlJdbcSourceAcceptanceTest.java @@ -473,7 +473,8 @@ protected AirbyteCatalog getCatalog(final String defaultNamespace) { Field.of(COL_NAME, JsonSchemaType.STRING), Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE)) .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) - .withSourceDefinedPrimaryKey(List.of(List.of(COL_ID))), + .withSourceDefinedPrimaryKey(List.of(List.of(COL_ID))) + .withIsResumable(true), CatalogHelpers.createAirbyteStream( TABLE_NAME_WITHOUT_PK, defaultNamespace, @@ -481,7 +482,8 @@ protected AirbyteCatalog getCatalog(final String defaultNamespace) { Field.of(COL_NAME, JsonSchemaType.STRING), Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE)) .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) - .withSourceDefinedPrimaryKey(Collections.emptyList()), + .withSourceDefinedPrimaryKey(Collections.emptyList()) + .withIsResumable(false), CatalogHelpers.createAirbyteStream( TABLE_NAME_COMPOSITE_PK, defaultNamespace, @@ -490,7 +492,8 @@ protected AirbyteCatalog getCatalog(final String defaultNamespace) { Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE)) .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) .withSourceDefinedPrimaryKey( - List.of(List.of(COL_FIRST_NAME), List.of(COL_LAST_NAME))))); + List.of(List.of(COL_FIRST_NAME), List.of(COL_LAST_NAME))) + .withIsResumable(true))); } // Override from parent class as we're no longer including the legacy Data field. diff --git a/airbyte-integrations/connectors/source-postgres/build.gradle b/airbyte-integrations/connectors/source-postgres/build.gradle index 7f0bae33248e..902dbb055594 100644 --- a/airbyte-integrations/connectors/source-postgres/build.gradle +++ b/airbyte-integrations/connectors/source-postgres/build.gradle @@ -12,7 +12,7 @@ java { } airbyteJavaConnector { - cdkVersionRequired = '0.36.2' + cdkVersionRequired = '0.36.3' features = ['db-sources', 'datastore-postgres'] useLocalCdk = false } diff --git a/airbyte-integrations/connectors/source-postgres/metadata.yaml b/airbyte-integrations/connectors/source-postgres/metadata.yaml index 9ba3ae541df7..8164fda60846 100644 --- a/airbyte-integrations/connectors/source-postgres/metadata.yaml +++ b/airbyte-integrations/connectors/source-postgres/metadata.yaml @@ -9,7 +9,7 @@ data: connectorSubtype: database connectorType: source definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750 - dockerImageTag: 3.4.9 + dockerImageTag: 3.4.10 dockerRepository: airbyte/source-postgres documentationUrl: https://docs.airbyte.com/integrations/sources/postgres githubIssueLabel: source-postgres diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java index 09ada3cab413..94c7f6992002 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java @@ -286,7 +286,7 @@ protected void logPreSyncDebugData(final JdbcDatabase database, final Configured @Override @Trace(operationName = DISCOVER_TRACE_OPERATION_NAME) - public AirbyteCatalog discover(final JsonNode config) throws Exception { + public AirbyteCatalog discover(final JsonNode config) { final AirbyteCatalog catalog = super.discover(config); if (isCdc(config)) { @@ -830,11 +830,13 @@ protected void initializeForStateManager(final JdbcDatabase database, public boolean supportResumableFullRefresh(final JdbcDatabase database, final ConfiguredAirbyteStream airbyteStream) { // finalListOfStreamsToBeSyncedViaCtid will be initialized as part of state manager initialization // for non CDC only. - if (!ctidStateManager.getFileNodeHandler().hasFileNode(new io.airbyte.protocol.models.AirbyteStreamNameNamespacePair( - airbyteStream.getStream().getName(), airbyteStream.getStream().getNamespace()))) { - LOGGER.info("stream " + airbyteStream + " will not sync in resumeable full refresh mode."); - return false; - + // ctidStateManager will only be initialized in read operation. It will not be there for discover. + if (ctidStateManager != null) { + if (!ctidStateManager.getFileNodeHandler().hasFileNode(new io.airbyte.protocol.models.AirbyteStreamNameNamespacePair( + airbyteStream.getStream().getName(), airbyteStream.getStream().getNamespace()))) { + LOGGER.info("stream " + airbyteStream + " will not sync in resumeable full refresh mode."); + return false; + } } final FileNodeHandler fileNodeHandler = diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java index 4f6a01f90257..14ef8038eff7 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java @@ -118,6 +118,11 @@ protected JsonNode config() { .build(); } + @Override + protected void addIsResumableFlagForNonPkTable(final AirbyteStream stream) { + stream.setIsResumable(true); + } + @Override @BeforeEach protected void setup() { diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java index 1bd837081a38..8848075d5668 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java @@ -298,7 +298,8 @@ protected AirbyteCatalog getCatalog(final String defaultNamespace) { Field.of(COL_LAST_VISITED_AT, JsonSchemaType.STRING_TIMESTAMP_WITH_TIMEZONE), Field.of(COL_LAST_COMMENT_AT, JsonSchemaType.STRING_TIMESTAMP_WITHOUT_TIMEZONE)) .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) - .withSourceDefinedPrimaryKey(List.of(List.of(COL_ID))), + .withSourceDefinedPrimaryKey(List.of(List.of(COL_ID))) + .withIsResumable(true), CatalogHelpers.createAirbyteStream( TABLE_NAME_WITHOUT_PK, defaultNamespace, @@ -309,7 +310,8 @@ protected AirbyteCatalog getCatalog(final String defaultNamespace) { Field.of(COL_LAST_VISITED_AT, JsonSchemaType.STRING_TIMESTAMP_WITH_TIMEZONE), Field.of(COL_LAST_COMMENT_AT, JsonSchemaType.STRING_TIMESTAMP_WITHOUT_TIMEZONE)) .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) - .withSourceDefinedPrimaryKey(Collections.emptyList()), + .withSourceDefinedPrimaryKey(Collections.emptyList()) + .withIsResumable(true), CatalogHelpers.createAirbyteStream( TABLE_NAME_COMPOSITE_PK, defaultNamespace, @@ -321,7 +323,8 @@ protected AirbyteCatalog getCatalog(final String defaultNamespace) { Field.of(COL_LAST_COMMENT_AT, JsonSchemaType.STRING_TIMESTAMP_WITHOUT_TIMEZONE)) .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) .withSourceDefinedPrimaryKey( - List.of(List.of(COL_FIRST_NAME), List.of(COL_LAST_NAME))))); + List.of(List.of(COL_FIRST_NAME), List.of(COL_LAST_NAME))) + .withIsResumable(true))); } @Override @@ -710,4 +713,9 @@ protected JsonNode getStateData(final AirbyteMessage airbyteMessage, final Strin throw new IllegalArgumentException("Stream not found in state message: " + streamName); } + @Override + protected Boolean supportResumeableFullRefreshWithoutPk() { + return true; + } + } diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceSSLTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceSSLTest.java index 7a246017ff97..02b507c81b16 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceSSLTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceSSLTest.java @@ -53,14 +53,16 @@ class PostgresSourceSSLTest { Field.of("name", JsonSchemaType.STRING), Field.of("power", JsonSchemaType.NUMBER)) .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) - .withSourceDefinedPrimaryKey(List.of(List.of("id"))), + .withSourceDefinedPrimaryKey(List.of(List.of("id"))) + .withIsResumable(true), CatalogHelpers.createAirbyteStream( STREAM_NAME + "2", SCHEMA_NAME, Field.of("id", JsonSchemaType.NUMBER), Field.of("name", JsonSchemaType.STRING), Field.of("power", JsonSchemaType.NUMBER)) - .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)), + .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) + .withIsResumable(true), CatalogHelpers.createAirbyteStream( "names", SCHEMA_NAME, @@ -68,7 +70,8 @@ class PostgresSourceSSLTest { Field.of("last_name", JsonSchemaType.STRING), Field.of("power", JsonSchemaType.NUMBER)) .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) - .withSourceDefinedPrimaryKey(List.of(List.of("first_name"), List.of("last_name"))))); + .withSourceDefinedPrimaryKey(List.of(List.of("first_name"), List.of("last_name"))) + .withIsResumable(true))); private static final ConfiguredAirbyteCatalog CONFIGURED_CATALOG = CatalogHelpers.toDefaultConfiguredCatalog(CATALOG); private static final Set ASCII_MESSAGES = Sets.newHashSet( createRecord(STREAM_NAME, map("id", new BigDecimal("1.0"), "name", "goku", "power", null), SCHEMA_NAME), diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java index 813ac0aeba7d..b0954e55862a 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java @@ -79,14 +79,16 @@ class PostgresSourceTest { Field.of("name", JsonSchemaType.STRING), Field.of("power", JsonSchemaType.NUMBER)) .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) - .withSourceDefinedPrimaryKey(List.of(List.of("id"))), + .withSourceDefinedPrimaryKey(List.of(List.of("id"))) + .withIsResumable(true), CatalogHelpers.createAirbyteStream( STREAM_NAME + "2", SCHEMA_NAME, Field.of("id", JsonSchemaType.NUMBER), Field.of("name", JsonSchemaType.STRING), Field.of("power", JsonSchemaType.NUMBER)) - .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)), + .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) + .withIsResumable(true), CatalogHelpers.createAirbyteStream( "names", SCHEMA_NAME, @@ -94,21 +96,24 @@ class PostgresSourceTest { Field.of("last_name", JsonSchemaType.STRING), Field.of("power", JsonSchemaType.NUMBER)) .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) - .withSourceDefinedPrimaryKey(List.of(List.of("first_name"), List.of("last_name"))), + .withSourceDefinedPrimaryKey(List.of(List.of("first_name"), List.of("last_name"))) + .withIsResumable(true), CatalogHelpers.createAirbyteStream( STREAM_NAME_PRIVILEGES_TEST_CASE, SCHEMA_NAME, Field.of("id", JsonSchemaType.NUMBER), Field.of("name", JsonSchemaType.STRING)) .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) - .withSourceDefinedPrimaryKey(List.of(List.of("id"))), + .withSourceDefinedPrimaryKey(List.of(List.of("id"))) + .withIsResumable(true), CatalogHelpers.createAirbyteStream( STREAM_NAME_PRIVILEGES_TEST_CASE_VIEW, SCHEMA_NAME, Field.of("id", JsonSchemaType.NUMBER), Field.of("name", JsonSchemaType.STRING)) .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) - .withSourceDefinedPrimaryKey(List.of(List.of("id"))))); + .withSourceDefinedPrimaryKey(List.of(List.of("id"))) + .withIsResumable(true))); private static final ConfiguredAirbyteCatalog CONFIGURED_CATALOG = CatalogHelpers.toDefaultConfiguredCatalog(CATALOG); private static final ConfiguredAirbyteCatalog CONFIGURED_INCR_CATALOG = toIncrementalConfiguredCatalog(CATALOG); @@ -297,6 +302,17 @@ void testDiscoverWithPk() throws Exception { }); } + @Test + void testDiscoverWithViewShouldNotBeResumeable() throws Exception { + final ConfiguredAirbyteStream viewStream = createViewWithNullValueCursor(testdb.getDatabase()); + final AirbyteCatalog actual = source().discover(getConfig()); + + final Optional actualStream = + actual.getStreams().stream().filter(stream -> stream.getName().equals(viewStream.getStream().getName())).findAny(); + assertTrue(actualStream.isPresent()); + assertEquals(actualStream.get().getIsResumable(), false); + } + @Test void testDiscoverRecursiveRolePermissions() throws Exception { testdb.query(ctx -> { @@ -897,7 +913,8 @@ CREATE VIEW test_view_null_cursor(id) as SCHEMA_NAME, Field.of("id", JsonSchemaType.STRING)) .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) - .withSourceDefinedPrimaryKey(List.of(List.of("id")))); + .withSourceDefinedPrimaryKey(List.of(List.of("id"))) + .withIsResumable(false)); } diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/XminPostgresSourceTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/XminPostgresSourceTest.java index c2088e5985c2..2aeae74804b0 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/XminPostgresSourceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/XminPostgresSourceTest.java @@ -59,7 +59,8 @@ class XminPostgresSourceTest { Field.of("power", JsonSchemaType.NUMBER)) .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) .withSourceDefinedCursor(true) - .withSourceDefinedPrimaryKey(List.of(List.of("id"))), + .withSourceDefinedPrimaryKey(List.of(List.of("id"))) + .withIsResumable(true), CatalogHelpers.createAirbyteStream( STREAM_NAME + "2", SCHEMA_NAME, @@ -67,7 +68,8 @@ class XminPostgresSourceTest { Field.of("name", JsonSchemaType.STRING), Field.of("power", JsonSchemaType.NUMBER)) .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) - .withSourceDefinedCursor(true), + .withSourceDefinedCursor(true) + .withIsResumable(true), CatalogHelpers.createAirbyteStream( "names", SCHEMA_NAME, @@ -76,7 +78,8 @@ class XminPostgresSourceTest { Field.of("power", JsonSchemaType.NUMBER)) .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) .withSourceDefinedCursor(true) - .withSourceDefinedPrimaryKey(List.of(List.of("first_name"), List.of("last_name"))))); + .withSourceDefinedPrimaryKey(List.of(List.of("first_name"), List.of("last_name"))) + .withIsResumable(true))); protected static final ConfiguredAirbyteCatalog CONFIGURED_XMIN_CATALOG = toConfiguredXminCatalog(CATALOG); diff --git a/docs/integrations/sources/mongodb-v2-migrations.md b/docs/integrations/sources/mongodb-v2-migrations.md new file mode 100644 index 000000000000..1dde3de1c75b --- /dev/null +++ b/docs/integrations/sources/mongodb-v2-migrations.md @@ -0,0 +1,5 @@ +# MongoDb Migration Guide + +## Upgrading to 1.0.0 +MongoDB now supports incremental syncs. Alternatively you can also choose to use full refresh if your DB has lots of updates but in relatively +small volume. \ No newline at end of file diff --git a/docs/integrations/sources/mongodb-v2.md b/docs/integrations/sources/mongodb-v2.md index 4e9fe1140263..b4aad6ea0708 100644 --- a/docs/integrations/sources/mongodb-v2.md +++ b/docs/integrations/sources/mongodb-v2.md @@ -195,7 +195,8 @@ For more information regarding configuration parameters, please see [MongoDb Doc ## Changelog | Version | Date | Pull Request | Subject | -| :------ | :--------- | :------------------------------------------------------- | :-------------------------------------------------------------------------------------------------------- | +|:--------| :--------- | :------------------------------------------------------- | :-------------------------------------------------------------------------------------------------------- | +| 1.3.14 | 2024-05-29 | [38584](https://github.com/airbytehq/airbyte/pull/38584) | Set is_resumable flag in discover. | | 1.3.13 | 2024-05-09 | [36851](https://github.com/airbytehq/airbyte/pull/36851) | Support reading collection with a binary \_id type. | | 1.3.12 | 2024-05-07 | [36851](https://github.com/airbytehq/airbyte/pull/36851) | Upgrade debezium to version 2.5.1. | | 1.3.11 | 2024-05-02 | [37753](https://github.com/airbytehq/airbyte/pull/37753) | Chunk size(limit) should correspond to ~1GB of data. | diff --git a/docs/integrations/sources/mssql.md b/docs/integrations/sources/mssql.md index 98a68de27655..6895be6c5399 100644 --- a/docs/integrations/sources/mssql.md +++ b/docs/integrations/sources/mssql.md @@ -419,6 +419,7 @@ WHERE actor_definition_id ='b5ea17b1-f170-46dc-bc31-cc744ca984c1' AND (configura | Version | Date | Pull Request | Subject | |:--------|:-----------|:------------------------------------------------------------------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------| +| 4.0.27 | 2024-05-29 | [38584](https://github.com/airbytehq/airbyte/pull/38584) | Set is_resumable flag in discover. | | 4.0.26 | 2024-05-16 | [38292](https://github.com/airbytehq/airbyte/pull/38292) | Improve cursor value query to return only one row | | 4.0.25 | 2024-05-29 | [38775](https://github.com/airbytehq/airbyte/pull/38775) | Publish CDK | | 4.0.24 | 2024-05-23 | [38640](https://github.com/airbytehq/airbyte/pull/38640) | Sync sending trace status messages indicating progress. | diff --git a/docs/integrations/sources/mysql.md b/docs/integrations/sources/mysql.md index f72a6573f6dd..8d3d8078cb13 100644 --- a/docs/integrations/sources/mysql.md +++ b/docs/integrations/sources/mysql.md @@ -230,7 +230,8 @@ Any database or table encoding combination of charset and collation is supported | Version | Date | Pull Request | Subject | |:--------|:-----------| :--------------------------------------------------------- |:------------------------------------------------------------------------------------------------------------------------------------------------| -| 3.4.6 | 2024-05-29 | [38538](https://github.com/airbytehq/airbyte/pull/38538) | Exit connector when encountering a config error. | +| 3.4.7 | 2024-05-29 | [38584](https://github.com/airbytehq/airbyte/pull/38584) | Set is_resumable flag in discover. | +| 3.4.6 | 2024-05-29 | [38538](https://github.com/airbytehq/airbyte/pull/38538) | Exit connector when encountering a config error. | | 3.4.5 | 2024-05-23 | [38198](https://github.com/airbytehq/airbyte/pull/38198) | Sync sending trace status messages indicating progress. | | 3.4.4 | 2024-05-15 | [38208](https://github.com/airbytehq/airbyte/pull/38208) | disable counts in full refresh stream in state message. | | 3.4.3 | 2024-05-13 | [38104](https://github.com/airbytehq/airbyte/pull/38104) | Handle transient error messages. | diff --git a/docs/integrations/sources/postgres.md b/docs/integrations/sources/postgres.md index 1fc167ce079c..d9bf1893fa5a 100644 --- a/docs/integrations/sources/postgres.md +++ b/docs/integrations/sources/postgres.md @@ -309,7 +309,9 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp | Version | Date | Pull Request | Subject | |---------|------------|----------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 3.4.10 | 2024-05-29 | [38584](https://github.com/airbytehq/airbyte/pull/38584) | Set is_resumable flag in discover. | | 3.4.9 | 2024-05-29 | [38775](https://github.com/airbytehq/airbyte/pull/38775) | Publish CDK | +| 3.4.9 | 2024-05-28 | [38716](https://github.com/airbytehq/airbyte/pull/38716) | Publish CDK | | 3.4.8 | 2024-05-28 | [38716](https://github.com/airbytehq/airbyte/pull/38716) | Stream status for postgres | | 3.4.7 | 2024-05-20 | [38365](https://github.com/airbytehq/airbyte/pull/38365) | Rollback a previously version (3.4.6) | | 3.4.5 | 2024-05-16 | [38303](https://github.com/airbytehq/airbyte/pull/38303) | Streams not in the CDC publication still have a cursor and PK. |