Skip to content

Commit

Permalink
Set isResumeable flag in catalog (#38584)
Browse files Browse the repository at this point in the history
Co-authored-by: Dhroov Makwana <[email protected]>
Co-authored-by: Alexandre Girard <[email protected]>
Co-authored-by: btkcodedev <[email protected]>
Co-authored-by: Gireesh Sreepathi <[email protected]>
Co-authored-by: Yue Li <[email protected]>
Co-authored-by: Augustin <[email protected]>
Co-authored-by: Natik Gadzhi <[email protected]>
Co-authored-by: Danylo Jablonski <[email protected]>
Co-authored-by: Maxime Carbonneau-Leclerc <[email protected]>
Co-authored-by: Oleksandr Bazarnov <[email protected]>
Co-authored-by: Ben Church <[email protected]>
Co-authored-by: alafanechere <[email protected]>
Co-authored-by: Christo Grabowski <[email protected]>
Co-authored-by: Edward Gao <[email protected]>
Co-authored-by: Catherine Noll <[email protected]>
Co-authored-by: Audrey Maldonado <[email protected]>
Co-authored-by: Rodi Reich Zilberman <[email protected]>
Co-authored-by: Serhii Lazebnyi <[email protected]>
Co-authored-by: Serhii Lazebnyi <[email protected]>
  • Loading branch information
20 people authored May 30, 2024
1 parent 06ac9a0 commit 2d194fa
Show file tree
Hide file tree
Showing 34 changed files with 201 additions and 44 deletions.
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.36.2
version=0.36.3
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,11 @@ import io.airbyte.commons.util.AutoCloseableIterator
import io.airbyte.commons.util.AutoCloseableIterators
import io.airbyte.protocol.models.CommonField
import io.airbyte.protocol.models.JsonSchemaType
import io.airbyte.protocol.models.v0.AirbyteCatalog
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.AirbyteStream
import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair
import io.airbyte.protocol.models.v0.CatalogHelpers
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
import io.airbyte.protocol.models.v0.SyncMode
Expand Down Expand Up @@ -106,6 +109,21 @@ abstract class AbstractJdbcSource<Datatype>(
return false
}

override fun discover(config: JsonNode): AirbyteCatalog {
var catalog = super.discover(config)
var database = createDatabase(config)
catalog.streams.forEach(
Consumer { stream: AirbyteStream ->
stream.isResumable =
supportResumableFullRefresh(
database,
CatalogHelpers.toDefaultConfiguredStream(stream)
)
}
)
return catalog
}

open fun getInitialLoadHandler(
database: JdbcDatabase,
airbyteStream: ConfiguredAirbyteStream,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import io.airbyte.commons.json.Jsons
import io.airbyte.commons.util.MoreIterators
import io.airbyte.protocol.models.Field
import io.airbyte.protocol.models.JsonSchemaType
import io.airbyte.protocol.models.v0.AirbyteCatalog
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.AirbyteStateMessage
import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage
Expand Down Expand Up @@ -67,6 +68,60 @@ internal class DefaultJdbcSourceAcceptanceTest :
return true
}

// Default test source does not support RFR.
public override fun supportResumeableFullRefreshWithoutPk(): Boolean? {
return false
}

override fun getCatalog(defaultNamespace: String?): AirbyteCatalog {
return AirbyteCatalog()
.withStreams(
mutableListOf(
CatalogHelpers.createAirbyteStream(
TABLE_NAME,
defaultNamespace,
Field.of(COL_ID, JsonSchemaType.INTEGER),
Field.of(COL_NAME, JsonSchemaType.STRING),
Field.of(COL_UPDATED_AT, JsonSchemaType.STRING),
)
.withSupportedSyncModes(
java.util.List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL),
)
.withSourceDefinedPrimaryKey(java.util.List.of(java.util.List.of(COL_ID)))
.withIsResumable(false),
CatalogHelpers.createAirbyteStream(
TABLE_NAME_WITHOUT_PK,
defaultNamespace,
Field.of(COL_ID, JsonSchemaType.INTEGER),
Field.of(COL_NAME, JsonSchemaType.STRING),
Field.of(COL_UPDATED_AT, JsonSchemaType.STRING),
)
.withSupportedSyncModes(
java.util.List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL),
)
.withSourceDefinedPrimaryKey(emptyList())
.withIsResumable(false),
CatalogHelpers.createAirbyteStream(
TABLE_NAME_COMPOSITE_PK,
defaultNamespace,
Field.of(COL_FIRST_NAME, JsonSchemaType.STRING),
Field.of(COL_LAST_NAME, JsonSchemaType.STRING),
Field.of(COL_UPDATED_AT, JsonSchemaType.STRING),
)
.withSupportedSyncModes(
java.util.List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL),
)
.withSourceDefinedPrimaryKey(
java.util.List.of(
java.util.List.of(COL_FIRST_NAME),
java.util.List.of(COL_LAST_NAME),
),
)
.withIsResumable(false),
),
)
}

fun getConfigWithConnectionProperties(
psqlDb: PostgreSQLContainer<*>,
dbName: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {

protected abstract fun addCdcMetadataColumns(stream: AirbyteStream?)

protected abstract fun addIsResumableFlagForNonPkTable(stream: AirbyteStream?)

protected abstract fun addCdcDefaultCursorField(stream: AirbyteStream?)

protected abstract fun assertExpectedStateMessages(stateMessages: List<AirbyteStateMessage>)
Expand Down Expand Up @@ -1457,6 +1459,7 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
val streams = expectedCatalog.streams
// stream with PK
streams[0].sourceDefinedCursor = true
streams[0].isResumable = true
addCdcMetadataColumns(streams[0])
addCdcDefaultCursorField(streams[0])

Expand All @@ -1472,6 +1475,7 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
streamWithoutPK.supportedSyncModes = java.util.List.of(SyncMode.FULL_REFRESH)
addCdcDefaultCursorField(streamWithoutPK)
addCdcMetadataColumns(streamWithoutPK)
addIsResumableFlagForNonPkTable(streamWithoutPK)

val randomStream =
CatalogHelpers.createAirbyteStream(
Expand All @@ -1488,6 +1492,7 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
.withSourceDefinedPrimaryKey(
java.util.List.of(java.util.List.of(COL_ID + "_random")),
)
.withIsResumable(true)

addCdcDefaultCursorField(randomStream)
addCdcMetadataColumns(randomStream)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,10 @@ abstract class JdbcSourceAcceptanceTest<S : Source, T : TestDatabase<*, T, *>> {
}
}

protected open fun supportResumeableFullRefreshWithoutPk(): Boolean? {
return false
}

@Test
@Throws(Exception::class)
protected fun testDiscoverWithMultipleSchemas() {
Expand Down Expand Up @@ -379,7 +383,8 @@ abstract class JdbcSourceAcceptanceTest<S : Source, T : TestDatabase<*, T, *>> {
)
.withSupportedSyncModes(
java.util.List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL),
),
)
.withIsResumable(supportResumeableFullRefreshWithoutPk()),
)
expected.streams = catalogStreams
// sort streams by name so that we are comparing lists with the same order.
Expand Down
2 changes: 1 addition & 1 deletion airbyte-cdk/java/airbyte-cdk/dependencies/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ dependencies {
api 'com.fasterxml.jackson.module:jackson-module-kotlin'
api 'com.google.guava:guava:33.0.0-jre'
api 'commons-io:commons-io:2.15.1'
api ('io.airbyte.airbyte-protocol:protocol-models:0.9.0') { exclude group: 'com.google.api-client', module: 'google-api-client' }
api ('io.airbyte.airbyte-protocol:protocol-models:0.11.0') { exclude group: 'com.google.api-client', module: 'google-api-client' }
api 'javax.annotation:javax.annotation-api:1.3.2'
api 'org.apache.commons:commons-compress:1.25.0'
api 'org.apache.commons:commons-lang3:3.14.0'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.31.5'
cdkVersionRequired = '0.36.3'
features = ['db-sources', 'datastore-mongo']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e
dockerImageTag: 1.3.13
dockerImageTag: 1.3.14
dockerRepository: airbyte/source-mongodb-v2
documentationUrl: https://docs.airbyte.com/integrations/sources/mongodb-v2
githubIssueLabel: source-mongodb-v2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ public static List<AirbyteStream> getAirbyteStreams(final MongoClient mongoClien
.map(collectionName -> discoverFields(collectionName, mongoClient, databaseName, sampleSize, isSchemaEnforced))
.filter(Optional::isPresent)
.map(Optional::get)
.map(stream -> stream.withIsResumable(true))
.collect(Collectors.toList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ void testDiscoverOperation() throws IOException {
assertEquals(List.of(DEFAULT_CURSOR_FIELD), stream.get().getDefaultCursorField());
assertEquals(List.of(List.of(MongoCatalogHelper.DEFAULT_PRIMARY_KEY)), stream.get().getSourceDefinedPrimaryKey());
assertEquals(MongoCatalogHelper.SUPPORTED_SYNC_MODES, stream.get().getSupportedSyncModes());
assertEquals(true, stream.get().getIsResumable());
}

@Test
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mssql/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.36.2'
cdkVersionRequired = '0.36.3'
features = ['db-sources']
useLocalCdk = false
}
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mssql/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
dockerImageTag: 4.0.26
dockerImageTag: 4.0.27
dockerRepository: airbyte/source-mssql
documentationUrl: https://docs.airbyte.com/integrations/sources/mssql
githubIssueLabel: source-mssql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ public Set<String> getExcludedInternalNameSpaces() {
}

@Override
public AirbyteCatalog discover(final JsonNode config) throws Exception {
public AirbyteCatalog discover(final JsonNode config) {
final AirbyteCatalog catalog = super.discover(config);

if (MssqlCdcHelper.isCdc(config)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,11 @@ public void newTableSnapshotTest() {
// Do nothing
}

@Override
protected void addIsResumableFlagForNonPkTable(final AirbyteStream stream) {
stream.setIsResumable(false);
}

// Utilize the setup to do test on MssqlDebeziumStateUtil.
@Test
public void testCdcSnapshot() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,15 +385,17 @@ protected AirbyteCatalog getCatalog(final String defaultNamespace) {
Field.of(COL_NAME, JsonSchemaType.STRING),
Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE))
.withSupportedSyncModes(List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of(COL_ID))),
.withSourceDefinedPrimaryKey(List.of(List.of(COL_ID)))
.withIsResumable(true),
CatalogHelpers.createAirbyteStream(
TABLE_NAME_WITHOUT_PK,
defaultNamespace,
Field.of(COL_ID, JsonSchemaType.INTEGER),
Field.of(COL_NAME, JsonSchemaType.STRING),
Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE))
.withSupportedSyncModes(List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(Collections.emptyList()),
.withSourceDefinedPrimaryKey(Collections.emptyList())
.withIsResumable(false),
CatalogHelpers.createAirbyteStream(
TABLE_NAME_COMPOSITE_PK,
defaultNamespace,
Expand All @@ -402,7 +404,8 @@ protected AirbyteCatalog getCatalog(final String defaultNamespace) {
Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE))
.withSupportedSyncModes(List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(
List.of(List.of(COL_FIRST_NAME), List.of(COL_LAST_NAME)))));
List.of(List.of(COL_FIRST_NAME), List.of(COL_LAST_NAME)))
.withIsResumable(true)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ class MssqlSourceTest {
Field.of("name", JsonSchemaType.STRING),
Field.of("born", JsonSchemaType.STRING_TIMESTAMP_WITH_TIMEZONE))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of("id")))));
.withSourceDefinedPrimaryKey(List.of(List.of("id")))
.withIsResumable(true)));

private MsSQLTestDatabase testdb;

Expand Down Expand Up @@ -69,14 +70,21 @@ private JsonNode getConfig() {
// the column twice. we now de-duplicate it (pr: https://github.com/airbytehq/airbyte/pull/983).
// this tests that this de-duplication is successful.
@Test
void testDiscoverWithPk() throws Exception {
void testDiscoverWithPk() {
testdb
.with("ALTER TABLE id_and_name ADD CONSTRAINT i3pk PRIMARY KEY CLUSTERED (id);")
.with("CREATE INDEX i1 ON id_and_name (id);");
final AirbyteCatalog actual = source().discover(getConfig());
assertEquals(CATALOG, actual);
}

@Test
void testDiscoverWithoutPk() {
final AirbyteCatalog actual = source().discover(getConfig());
assertEquals(STREAM_NAME, actual.getStreams().get(0).getName());
assertEquals(false, actual.getStreams().get(0).getIsResumable());
}

@Test
@Disabled("See https://github.com/airbytehq/airbyte/pull/23908#issuecomment-1463753684, enable once communication is out")
public void testTableWithNullCursorValueShouldThrowException() throws Exception {
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.36.2'
cdkVersionRequired = '0.36.3'
features = ['db-sources']
useLocalCdk = false
}
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerImageTag: 3.4.6
dockerImageTag: 3.4.7
dockerRepository: airbyte/source-mysql
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
githubIssueLabel: source-mysql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ public List<CheckedConsumer<JdbcDatabase, Exception>> getCheckOperations(final J
}

@Override
public AirbyteCatalog discover(final JsonNode config) throws Exception {
public AirbyteCatalog discover(final JsonNode config) {
final AirbyteCatalog catalog = super.discover(config);

if (isCdc(config)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,11 @@ protected boolean supportResumableFullRefresh() {
return true;
}

@Override
protected void addIsResumableFlagForNonPkTable(final AirbyteStream stream) {
stream.setIsResumable(false);
}

@Test
protected void syncWithReplicationClientPrivilegeRevokedFailsCheck() throws Exception {
testdb.with("REVOKE REPLICATION CLIENT ON *.* FROM %s@'%%';", testdb.getUserName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -473,15 +473,17 @@ protected AirbyteCatalog getCatalog(final String defaultNamespace) {
Field.of(COL_NAME, JsonSchemaType.STRING),
Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of(COL_ID))),
.withSourceDefinedPrimaryKey(List.of(List.of(COL_ID)))
.withIsResumable(true),
CatalogHelpers.createAirbyteStream(
TABLE_NAME_WITHOUT_PK,
defaultNamespace,
Field.of(COL_ID, JsonSchemaType.INTEGER),
Field.of(COL_NAME, JsonSchemaType.STRING),
Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(Collections.emptyList()),
.withSourceDefinedPrimaryKey(Collections.emptyList())
.withIsResumable(false),
CatalogHelpers.createAirbyteStream(
TABLE_NAME_COMPOSITE_PK,
defaultNamespace,
Expand All @@ -490,7 +492,8 @@ protected AirbyteCatalog getCatalog(final String defaultNamespace) {
Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(
List.of(List.of(COL_FIRST_NAME), List.of(COL_LAST_NAME)))));
List.of(List.of(COL_FIRST_NAME), List.of(COL_LAST_NAME)))
.withIsResumable(true)));
}

// Override from parent class as we're no longer including the legacy Data field.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ java {
}

airbyteJavaConnector {
cdkVersionRequired = '0.36.2'
cdkVersionRequired = '0.36.3'
features = ['db-sources', 'datastore-postgres']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerImageTag: 3.4.9
dockerImageTag: 3.4.10
dockerRepository: airbyte/source-postgres
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
githubIssueLabel: source-postgres
Expand Down
Loading

0 comments on commit 2d194fa

Please sign in to comment.