Skip to content

Commit

Permalink
Resumable Full Refresh sync for mssql (#37451)
Browse files Browse the repository at this point in the history
Co-authored-by: Xiaohan Song <[email protected]>
  • Loading branch information
rodireich and xiaohansong authored May 7, 2024
1 parent 395effc commit 5432fab
Show file tree
Hide file tree
Showing 24 changed files with 598 additions and 381 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ internal class DefaultJdbcSourceAcceptanceTest :
JdbcSourceAcceptanceTest<
DefaultJdbcSourceAcceptanceTest.PostgresTestSource, BareBonesTestDatabase>() {
override fun config(): JsonNode {
return testdb.testConfigBuilder().build()
return testdb?.testConfigBuilder()?.build()!!
}

override fun source(): PostgresTestSource {
Expand Down Expand Up @@ -181,12 +181,15 @@ internal class DefaultJdbcSourceAcceptanceTest :
fun testCustomParametersOverwriteDefaultParametersExpectException() {
val connectionPropertiesUrl = "ssl=false"
val config =
getConfigWithConnectionProperties(
PSQL_CONTAINER,
testdb.databaseName,
connectionPropertiesUrl
)
val customParameters = parseJdbcParameters(config, JdbcUtils.CONNECTION_PROPERTIES_KEY, "&")
testdb?.let {
getConfigWithConnectionProperties(
PSQL_CONTAINER,
it.databaseName,
connectionPropertiesUrl
)
}
val customParameters =
parseJdbcParameters(config!!, JdbcUtils.CONNECTION_PROPERTIES_KEY, "&")
val defaultParameters = mapOf("ssl" to "true", "sslmode" to "require")
Assertions.assertThrows(IllegalArgumentException::class.java) {
JdbcDataSourceUtils.assertCustomParametersDontOverwriteDefaultParameters(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import org.mockito.Mockito
"The static variables are updated in subclasses for convenience, and cannot be final."
)
abstract class JdbcSourceAcceptanceTest<S : Source, T : TestDatabase<*, T, *>> {
@JvmField protected var testdb: T = createTestDatabase()
@JvmField protected var testdb: T? = null

protected fun streamName(): String {
return TABLE_NAME
Expand Down Expand Up @@ -120,60 +120,60 @@ abstract class JdbcSourceAcceptanceTest<S : Source, T : TestDatabase<*, T, *>> {
testdb!!.with("ALTER SESSION SET NLS_DATE_FORMAT = 'YYYY-MM-DD'")
}
testdb
.with(
?.with(
createTableQuery(
getFullyQualifiedTableName(TABLE_NAME),
COLUMN_CLAUSE_WITH_PK,
primaryKeyClause(listOf("id"))
)
)
.with(
?.with(
"INSERT INTO %s(id, name, updated_at) VALUES (1, 'picard', '2004-10-19')",
getFullyQualifiedTableName(TABLE_NAME)
)
.with(
?.with(
"INSERT INTO %s(id, name, updated_at) VALUES (2, 'crusher', '2005-10-19')",
getFullyQualifiedTableName(TABLE_NAME)
)
.with(
?.with(
"INSERT INTO %s(id, name, updated_at) VALUES (3, 'vash', '2006-10-19')",
getFullyQualifiedTableName(TABLE_NAME)
)
.with(
?.with(
createTableQuery(
getFullyQualifiedTableName(TABLE_NAME_WITHOUT_PK),
COLUMN_CLAUSE_WITHOUT_PK,
""
)
)
.with(
?.with(
"INSERT INTO %s(id, name, updated_at) VALUES (1, 'picard', '2004-10-19')",
getFullyQualifiedTableName(TABLE_NAME_WITHOUT_PK)
)
.with(
?.with(
"INSERT INTO %s(id, name, updated_at) VALUES (2, 'crusher', '2005-10-19')",
getFullyQualifiedTableName(TABLE_NAME_WITHOUT_PK)
)
.with(
?.with(
"INSERT INTO %s(id, name, updated_at) VALUES (3, 'vash', '2006-10-19')",
getFullyQualifiedTableName(TABLE_NAME_WITHOUT_PK)
)
.with(
?.with(
createTableQuery(
getFullyQualifiedTableName(TABLE_NAME_COMPOSITE_PK),
COLUMN_CLAUSE_WITH_COMPOSITE_PK,
primaryKeyClause(listOf("first_name", "last_name"))
)
)
.with(
?.with(
"INSERT INTO %s(first_name, last_name, updated_at) VALUES ('first', 'picard', '2004-10-19')",
getFullyQualifiedTableName(TABLE_NAME_COMPOSITE_PK)
)
.with(
?.with(
"INSERT INTO %s(first_name, last_name, updated_at) VALUES ('second', 'crusher', '2005-10-19')",
getFullyQualifiedTableName(TABLE_NAME_COMPOSITE_PK)
)
.with(
?.with(
"INSERT INTO %s(first_name, last_name, updated_at) VALUES ('third', 'vash', '2006-10-19')",
getFullyQualifiedTableName(TABLE_NAME_COMPOSITE_PK)
)
Expand Down Expand Up @@ -774,11 +774,11 @@ abstract class JdbcSourceAcceptanceTest<S : Source, T : TestDatabase<*, T, *>> {

protected open fun executeStatementReadIncrementallyTwice() {
testdb
.with(
?.with(
"INSERT INTO %s (id, name, updated_at) VALUES (4, 'riker', '2006-10-19')",
getFullyQualifiedTableName(TABLE_NAME)
)
.with(
?.with(
"INSERT INTO %s (id, name, updated_at) VALUES (5, 'data', '2006-10-19')",
getFullyQualifiedTableName(TABLE_NAME)
)
Expand Down
4 changes: 2 additions & 2 deletions airbyte-integrations/connectors/source-mssql/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ plugins {
airbyteJavaConnector {
cdkVersionRequired = '0.31.5'
features = ['db-sources']
useLocalCdk = false
useLocalCdk = true
}

java {
// TODO: rewrite code to avoid javac wornings in the first place
// TODO: rewrite code to avoid javac warnings in the first place
compileJava {
options.compilerArgs += "-Xlint:-try,-rawtypes"
}
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mssql/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
dockerImageTag: 4.0.17
dockerImageTag: 4.0.18
dockerRepository: airbyte/source-mssql
documentationUrl: https://docs.airbyte.com/integrations/sources/mssql
githubIssueLabel: source-mssql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,20 @@ public enum ReplicationMethod {

@VisibleForTesting
static boolean isCdc(final JsonNode config) {
// new replication method config since version 0.4.0
if (config.hasNonNull(LEGACY_REPLICATION_FIELD) && config.get(LEGACY_REPLICATION_FIELD).isObject()) {
final JsonNode replicationConfig = config.get(LEGACY_REPLICATION_FIELD);
return ReplicationMethod.valueOf(replicationConfig.get(METHOD_FIELD).asText()) == ReplicationMethod.CDC;
}
// legacy replication method config before version 0.4.0
if (config.hasNonNull(LEGACY_REPLICATION_FIELD) && config.get(LEGACY_REPLICATION_FIELD).isTextual()) {
return ReplicationMethod.valueOf(config.get(LEGACY_REPLICATION_FIELD).asText()) == ReplicationMethod.CDC;
}
if (config.hasNonNull(REPLICATION_FIELD)) {
final JsonNode replicationConfig = config.get(REPLICATION_FIELD);
return ReplicationMethod.valueOf(replicationConfig.get(REPLICATION_TYPE_FIELD).asText()) == ReplicationMethod.CDC;
if (config != null) {
// new replication method config since version 0.4.0
if (config.hasNonNull(LEGACY_REPLICATION_FIELD) && config.get(LEGACY_REPLICATION_FIELD).isObject()) {
final JsonNode replicationConfig = config.get(LEGACY_REPLICATION_FIELD);
return ReplicationMethod.valueOf(replicationConfig.get(METHOD_FIELD).asText()) == ReplicationMethod.CDC;
}
// legacy replication method config before version 0.4.0
if (config.hasNonNull(LEGACY_REPLICATION_FIELD) && config.get(LEGACY_REPLICATION_FIELD).isTextual()) {
return ReplicationMethod.valueOf(config.get(LEGACY_REPLICATION_FIELD).asText()) == ReplicationMethod.CDC;
}
if (config.hasNonNull(REPLICATION_FIELD)) {
final JsonNode replicationConfig = config.get(REPLICATION_FIELD);
return ReplicationMethod.valueOf(replicationConfig.get(REPLICATION_TYPE_FIELD).asText()) == ReplicationMethod.CDC;
}
}

return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ public AirbyteMessage saveState(final Map<String, String> offset, final SchemaHi
state.put(IS_COMPRESSED, dbHistory.isCompressed());

final JsonNode asJson = Jsons.jsonNode(state);

LOGGER.info("debezium state offset: {}", Jsons.jsonNode(offset));

final CdcState cdcState = new CdcState().withState(asJson);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import io.airbyte.cdk.integrations.source.relationaldb.models.InternalModels.StateType;
import io.airbyte.cdk.integrations.source.relationaldb.state.StateManager;
import io.airbyte.commons.json.Jsons;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import java.math.BigDecimal;
Expand Down Expand Up @@ -185,46 +185,46 @@ public static Map<io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair,

final Map<io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair, CursorBasedStatus> cursorBasedStatusMap = new HashMap<>();
streams.forEach(stream -> {
try {
final String name = stream.getStream().getName();
final String namespace = stream.getStream().getNamespace();
final String fullTableName =
getFullyQualifiedTableNameWithQuoting(namespace, name, quoteString);

final Optional<CursorInfo> cursorInfoOptional =
stateManager.getCursorInfo(new io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair(name, namespace));
if (cursorInfoOptional.isEmpty()) {
throw new RuntimeException(String.format("Stream %s was not provided with an appropriate cursor", stream.getStream().getName()));
}
final String name = stream.getStream().getName();
final String namespace = stream.getStream().getNamespace();
final String fullTableName =
getFullyQualifiedTableNameWithQuoting(namespace, name, quoteString);

LOGGER.info("Querying max cursor value for {}.{}", namespace, name);
final String cursorField = cursorInfoOptional.get().getCursorField();
final Optional<CursorInfo> cursorInfoOptional =
stateManager.getCursorInfo(new AirbyteStreamNameNamespacePair(name, namespace));
if (cursorInfoOptional.isEmpty()) {
throw new RuntimeException(String.format("Stream %s was not provided with an appropriate cursor", stream.getStream().getName()));
}
final CursorBasedStatus cursorBasedStatus = new CursorBasedStatus();
final Optional<String> maybeCursorField = Optional.ofNullable(cursorInfoOptional.get().getCursorField());
maybeCursorField.ifPresent(cursorField -> {
LOGGER.info("Cursor {}. Querying max cursor value for {}.{}", cursorField, namespace, name);
final String quotedCursorField = getIdentifierWithQuoting(cursorField, quoteString);
final String cursorBasedSyncStatusQuery = String.format(MAX_CURSOR_VALUE_QUERY,
quotedCursorField,
fullTableName,
quotedCursorField,
quotedCursorField,
fullTableName);
final List<JsonNode> jsonNodes = database.bufferedResultSetQuery(conn -> conn.prepareStatement(cursorBasedSyncStatusQuery).executeQuery(),
resultSet -> JdbcUtils.getDefaultSourceOperations().rowToJson(resultSet));
final CursorBasedStatus cursorBasedStatus = new CursorBasedStatus();
cursorBasedStatus.setStateType(StateType.CURSOR_BASED);
cursorBasedStatus.setVersion(2L);
cursorBasedStatus.setStreamName(name);
cursorBasedStatus.setStreamNamespace(namespace);
final List<JsonNode> jsonNodes;
try {
jsonNodes = database.bufferedResultSetQuery(conn -> conn.prepareStatement(cursorBasedSyncStatusQuery).executeQuery(),
resultSet -> JdbcUtils.getDefaultSourceOperations().rowToJson(resultSet));
} catch (SQLException e) {
throw new RuntimeException("Failed to read max cursor value from %s.%s".formatted(namespace, name), e);
}
cursorBasedStatus.setCursorField(ImmutableList.of(cursorField));

if (!jsonNodes.isEmpty()) {
final JsonNode result = jsonNodes.get(0);
cursorBasedStatus.setCursor(result.get(cursorField).asText());
cursorBasedStatus.setCursorRecordCount((long) jsonNodes.size());
}

cursorBasedStatusMap.put(new io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair(name, namespace), cursorBasedStatus);
} catch (final SQLException e) {
throw new RuntimeException(e);
}
cursorBasedStatus.setStateType(StateType.CURSOR_BASED);
cursorBasedStatus.setVersion(2L);
cursorBasedStatus.setStreamName(name);
cursorBasedStatus.setStreamNamespace(namespace);
cursorBasedStatusMap.put(new AirbyteStreamNameNamespacePair(name, namespace), cursorBasedStatus);
});
});

return cursorBasedStatusMap;
Expand Down
Loading

0 comments on commit 5432fab

Please sign in to comment.