diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 1e9f70690b9a..a04beef39b77 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -1338,7 +1338,7 @@ - name: Postgres sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750 dockerRepository: airbyte/source-postgres - dockerImageTag: 1.0.36 + dockerImageTag: 1.0.37 documentationUrl: https://docs.airbyte.com/integrations/sources/postgres icon: postgresql.svg sourceType: database diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index aa679916a391..0193c183b78c 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -11395,7 +11395,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-postgres:1.0.36" +- dockerImage: "airbyte/source-postgres:1.0.37" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres" connectionSpecification: diff --git a/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/JdbcUtils.java b/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/JdbcUtils.java index 5529b35668c4..7fe8673241f2 100644 --- a/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/JdbcUtils.java +++ b/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/JdbcUtils.java @@ -54,6 +54,13 @@ public class JdbcUtils { public static final String MODE_KEY = "mode"; public static final String AMPERSAND = "&"; public static final String EQUALS = "="; + + // An estimate for how much additional data in sent over the wire due to conversion of source data + // into {@link AirbyteMessage}. This is due to + // the fact that records are in JSON format and all database fields are converted to Strings. + // Currently, this is used in the logic for emitting + // estimate trace messages. + public static final int PLATFORM_DATA_INCREASE_FACTOR = 2; public static final Set ALLOWED_CURSOR_TYPES = Set.of(TIMESTAMP, TIME, DATE, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE, REAL, NUMERIC, DECIMAL, NVARCHAR, VARCHAR, LONGVARCHAR); private static final JdbcSourceOperations defaultSourceOperations = new JdbcSourceOperations(); diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/AirbyteTraceMessageUtility.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/AirbyteTraceMessageUtility.java index 8b30e9a9f107..38fa74d890ba 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/AirbyteTraceMessageUtility.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/AirbyteTraceMessageUtility.java @@ -6,6 +6,7 @@ import io.airbyte.protocol.models.v0.AirbyteErrorTraceMessage; import io.airbyte.protocol.models.v0.AirbyteErrorTraceMessage.FailureType; +import io.airbyte.protocol.models.v0.AirbyteEstimateTraceMessage; import io.airbyte.protocol.models.v0.AirbyteMessage; import io.airbyte.protocol.models.v0.AirbyteMessage.Type; import io.airbyte.protocol.models.v0.AirbyteTraceMessage; @@ -24,6 +25,21 @@ public static void emitConfigErrorTrace(final Throwable e, final String displayM emitErrorTrace(e, displayMessage, FailureType.CONFIG_ERROR); } + public static void emitEstimateTrace(final long byteEstimate, + final AirbyteEstimateTraceMessage.Type type, + final long rowEstimate, + final String streamName, + final String streamNamespace) { + emitMessage(makeAirbyteMessageFromTraceMessage( + makeAirbyteTraceMessage(AirbyteTraceMessage.Type.ESTIMATE) + .withEstimate(new AirbyteEstimateTraceMessage() + .withByteEstimate(byteEstimate) + .withType(type) + .withRowEstimate(rowEstimate) + .withName(streamName) + .withNamespace(streamNamespace)))); + } + public static void emitErrorTrace(final Throwable e, final String displayMessage, final FailureType failureType) { emitMessage(makeErrorTraceAirbyteMessage(e, displayMessage, failureType)); } @@ -35,10 +51,10 @@ public static void emitErrorTrace(final Throwable e, final String displayMessage // public void emitNotificationTrace() {} // public void emitMetricTrace() {} - private static void emitMessage(AirbyteMessage message) { + private static void emitMessage(final AirbyteMessage message) { // Not sure why defaultOutputRecordCollector is under Destination specifically, // but this matches usage elsewhere in base-java - Consumer outputRecordCollector = Destination::defaultOutputRecordCollector; + final Consumer outputRecordCollector = Destination::defaultOutputRecordCollector; outputRecordCollector.accept(message); } @@ -56,7 +72,7 @@ private static AirbyteMessage makeErrorTraceAirbyteMessage( .withStackTrace(ExceptionUtils.getStackTrace(e)))); } - private static AirbyteMessage makeAirbyteMessageFromTraceMessage(AirbyteTraceMessage airbyteTraceMessage) { + private static AirbyteMessage makeAirbyteMessageFromTraceMessage(final AirbyteTraceMessage airbyteTraceMessage) { return new AirbyteMessage().withType(Type.TRACE).withTrace(airbyteTraceMessage); } diff --git a/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java b/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java index b5a4a8c249ec..a3ec2b81e7cc 100644 --- a/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java +++ b/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java @@ -401,11 +401,11 @@ protected String getCountColumnName() { return "record_count"; } - private long getActualCursorRecordCount(final Connection connection, - final String fullTableName, - final String quotedCursorField, - final Datatype cursorFieldType, - final String cursor) + protected long getActualCursorRecordCount(final Connection connection, + final String fullTableName, + final String quotedCursorField, + final Datatype cursorFieldType, + final String cursor) throws SQLException { final String columnName = getCountColumnName(); final PreparedStatement cursorRecordStatement; diff --git a/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile b/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile index 0978075777e9..df697ded6fa6 100644 --- a/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile +++ b/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres-strict-encrypt COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=1.0.36 +LABEL io.airbyte.version=1.0.37 LABEL io.airbyte.name=airbyte/source-postgres-strict-encrypt diff --git a/airbyte-integrations/connectors/source-postgres/Dockerfile b/airbyte-integrations/connectors/source-postgres/Dockerfile index 6edcec59beb5..291e5c2c7763 100644 --- a/airbyte-integrations/connectors/source-postgres/Dockerfile +++ b/airbyte-integrations/connectors/source-postgres/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=1.0.36 +LABEL io.airbyte.version=1.0.37 LABEL io.airbyte.name=airbyte/source-postgres diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresQueryUtils.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresQueryUtils.java new file mode 100644 index 000000000000..23140901e3a7 --- /dev/null +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresQueryUtils.java @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.postgres; + +/** + * Utility class to define constants related to querying postgres + */ +public class PostgresQueryUtils { + + public static final String NULL_CURSOR_VALUE_WITH_SCHEMA_QUERY = + """ + SELECT + (EXISTS (SELECT FROM information_schema.columns WHERE table_schema = '%s' AND table_name = '%s' AND is_nullable = 'YES' AND column_name = '%s')) + AND + (EXISTS (SELECT from \"%s\".\"%s\" where \"%s\" IS NULL LIMIT 1)) AS %s + """; + public static final String NULL_CURSOR_VALUE_NO_SCHEMA_QUERY = + """ + SELECT + (EXISTS (SELECT FROM information_schema.columns WHERE table_name = '%s' AND is_nullable = 'YES' AND column_name = '%s')) + AND + (EXISTS (SELECT from \"%s\" where \"%s\" IS NULL LIMIT 1)) AS %s + """; + + public static final String TABLE_ESTIMATE_QUERY = + """ + SELECT (SELECT COUNT(*) FROM %s) AS %s, + pg_relation_size('%s') AS %s; + """; + + public static final String ROW_COUNT_RESULT_COL = "rowcount"; + + public static final String TOTAL_BYTES_RESULT_COL = "totalbytes"; + +} diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java index d7f283127d3d..d2a3aab3609d 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java @@ -6,8 +6,16 @@ import static io.airbyte.db.jdbc.JdbcUtils.AMPERSAND; import static io.airbyte.db.jdbc.JdbcUtils.EQUALS; +import static io.airbyte.db.jdbc.JdbcUtils.PLATFORM_DATA_INCREASE_FACTOR; import static io.airbyte.integrations.debezium.AirbyteDebeziumHandler.shouldUseCDC; import static io.airbyte.integrations.source.jdbc.JdbcSSLConnectionUtils.PARAM_CA_CERTIFICATE; +import static io.airbyte.integrations.source.postgres.PostgresQueryUtils.NULL_CURSOR_VALUE_NO_SCHEMA_QUERY; +import static io.airbyte.integrations.source.postgres.PostgresQueryUtils.NULL_CURSOR_VALUE_WITH_SCHEMA_QUERY; +import static io.airbyte.integrations.source.postgres.PostgresQueryUtils.ROW_COUNT_RESULT_COL; +import static io.airbyte.integrations.source.postgres.PostgresQueryUtils.TABLE_ESTIMATE_QUERY; +import static io.airbyte.integrations.source.postgres.PostgresQueryUtils.TOTAL_BYTES_RESULT_COL; +import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.getFullyQualifiedTableNameWithQuoting; +import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.getIdentifierWithQuoting; import static io.airbyte.integrations.util.PostgresSslConnectionUtils.DISABLE; import static io.airbyte.integrations.util.PostgresSslConnectionUtils.PARAM_SSL_MODE; import static java.util.stream.Collectors.toList; @@ -29,6 +37,7 @@ import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.db.jdbc.JdbcUtils; import io.airbyte.db.jdbc.streaming.AdaptiveStreamingQueryConfig; +import io.airbyte.integrations.base.AirbyteTraceMessageUtility; import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.integrations.base.Source; import io.airbyte.integrations.base.ssh.SshWrappedSource; @@ -37,6 +46,7 @@ import io.airbyte.integrations.source.jdbc.AbstractJdbcSource; import io.airbyte.integrations.source.jdbc.JdbcSSLConnectionUtils; import io.airbyte.integrations.source.jdbc.dto.JdbcPrivilegeDto; +import io.airbyte.integrations.source.relationaldb.CursorInfo; import io.airbyte.integrations.source.relationaldb.TableInfo; import io.airbyte.integrations.source.relationaldb.models.CdcState; import io.airbyte.integrations.source.relationaldb.models.DbState; @@ -46,6 +56,7 @@ import io.airbyte.protocol.models.v0.AirbyteCatalog; import io.airbyte.protocol.models.v0.AirbyteConnectionStatus; import io.airbyte.protocol.models.v0.AirbyteConnectionStatus.Status; +import io.airbyte.protocol.models.v0.AirbyteEstimateTraceMessage.Type; import io.airbyte.protocol.models.v0.AirbyteGlobalState; import io.airbyte.protocol.models.v0.AirbyteMessage; import io.airbyte.protocol.models.v0.AirbyteStateMessage; @@ -93,20 +104,7 @@ public class PostgresSource extends AbstractJdbcSource implements public static final String SSL_KEY = "sslkey"; public static final String SSL_PASSWORD = "sslpassword"; public static final String MODE = "mode"; - public static final String NULL_CURSOR_VALUE_WITH_SCHEMA = - """ - SELECT - (EXISTS (SELECT FROM information_schema.columns WHERE table_schema = '%s' AND table_name = '%s' AND is_nullable = 'YES' AND column_name = '%s')) - AND - (EXISTS (SELECT from \"%s\".\"%s\" where \"%s\" IS NULL LIMIT 1)) AS %s - """; - public static final String NULL_CURSOR_VALUE_NO_SCHEMA = - """ - SELECT - (EXISTS (SELECT FROM information_schema.columns WHERE table_name = '%s' AND is_nullable = 'YES' AND column_name = '%s')) - AND - (EXISTS (SELECT from \"%s\" where \"%s\" IS NULL LIMIT 1)) AS %s - """; + private List schemas; private final FeatureFlags featureFlags; private static final Set INVALID_CDC_SSL_MODES = ImmutableSet.of("allow", "prefer"); @@ -535,10 +533,10 @@ protected boolean verifyCursorColumnValues(final JdbcDatabase database, final St final String resultColName = "nullValue"; // Query: Only if cursor column allows null values, query whether it contains one if (StringUtils.isNotBlank(schema)) { - query = String.format(NULL_CURSOR_VALUE_WITH_SCHEMA, + query = String.format(NULL_CURSOR_VALUE_WITH_SCHEMA_QUERY, schema, tableName, columnName, schema, tableName, columnName, resultColName); } else { - query = String.format(NULL_CURSOR_VALUE_NO_SCHEMA, + query = String.format(NULL_CURSOR_VALUE_NO_SCHEMA_QUERY, tableName, columnName, tableName, columnName, resultColName); } LOGGER.debug("null value query: {}", query); @@ -550,4 +548,123 @@ protected boolean verifyCursorColumnValues(final JdbcDatabase database, final St return !nullValExist; } + @Override + protected void estimateFullRefreshSyncSize(final JdbcDatabase database, + final ConfiguredAirbyteStream configuredAirbyteStream) { + try { + final String schemaName = configuredAirbyteStream.getStream().getNamespace(); + final String tableName = configuredAirbyteStream.getStream().getName(); + final String fullTableName = + getFullyQualifiedTableNameWithQuoting(schemaName, tableName, getQuoteString()); + + final List tableEstimateResult = getFullTableEstimate(database, fullTableName); + + if (!tableEstimateResult.isEmpty() && tableEstimateResult.get(0).has(ROW_COUNT_RESULT_COL) && + tableEstimateResult.get(0).has(TOTAL_BYTES_RESULT_COL)) { + final long syncRowCount = tableEstimateResult.get(0).get(ROW_COUNT_RESULT_COL).asLong(); + final long syncByteCount = tableEstimateResult.get(0).get(TOTAL_BYTES_RESULT_COL).asLong(); + + // Here, we double the bytes estimate to account for serialization. Perhaps a better way to do this + // is to + // read a row and Stringify it to better understand the accurate volume of data sent over the wire. + // However, this approach doesn't account for different row sizes. + AirbyteTraceMessageUtility.emitEstimateTrace(PLATFORM_DATA_INCREASE_FACTOR * syncByteCount, Type.STREAM, syncRowCount, tableName, schemaName); + LOGGER.info(String.format("Estimate for table: %s : {sync_row_count: %s, sync_bytes: %s, total_table_row_count: %s, total_table_bytes: %s}", + fullTableName, syncRowCount, syncByteCount, syncRowCount, syncByteCount)); + } + } catch (final SQLException e) { + LOGGER.warn("Error occurred while attempting to estimate sync size", e); + } + } + + @Override + protected void estimateIncrementalSyncSize(final JdbcDatabase database, + final ConfiguredAirbyteStream configuredAirbyteStream, + final CursorInfo cursorInfo, + final PostgresType cursorFieldType) { + try { + final String schemaName = configuredAirbyteStream.getStream().getNamespace(); + final String tableName = configuredAirbyteStream.getStream().getName(); + final String fullTableName = + getFullyQualifiedTableNameWithQuoting(schemaName, tableName, getQuoteString()); + + final List tableEstimateResult = getFullTableEstimate(database, fullTableName); + + final long tableRowCount = tableEstimateResult.get(0).get(ROW_COUNT_RESULT_COL).asLong(); + final long tableByteCount = tableEstimateResult.get(0).get(TOTAL_BYTES_RESULT_COL).asLong(); + + final long syncRowCount; + final long syncByteCount; + + syncRowCount = getIncrementalTableRowCount(database, fullTableName, cursorInfo, cursorFieldType); + if (tableRowCount == 0) { + syncByteCount = 0; + } else { + syncByteCount = (tableByteCount / tableRowCount) * syncRowCount; + } + + // Here, we double the bytes estimate to account for serialization. Perhaps a better way to do this + // is to + // read a row and Stringify it to better understand the accurate volume of data sent over the wire. + // However, this approach doesn't account for different row sizes + AirbyteTraceMessageUtility.emitEstimateTrace(PLATFORM_DATA_INCREASE_FACTOR * syncByteCount, Type.STREAM, syncRowCount, tableName, schemaName); + LOGGER.info(String.format("Estimate for table: %s : {sync_row_count: %s, sync_bytes: %s, total_table_row_count: %s, total_table_bytes: %s}", + fullTableName, syncRowCount, syncByteCount, tableRowCount, tableRowCount)); + } catch (final SQLException e) { + LOGGER.warn("Error occurred while attempting to estimate sync size", e); + } + } + + private List getFullTableEstimate(final JdbcDatabase database, final String fullTableName) throws SQLException { + // Construct the table estimate query. + final String tableEstimateQuery = + String.format(TABLE_ESTIMATE_QUERY, fullTableName, ROW_COUNT_RESULT_COL, fullTableName, TOTAL_BYTES_RESULT_COL); + LOGGER.debug("table estimate query: {}", tableEstimateQuery); + final List jsonNodes = database.bufferedResultSetQuery(conn -> conn.createStatement().executeQuery(tableEstimateQuery), + resultSet -> JdbcUtils.getDefaultSourceOperations().rowToJson(resultSet)); + Preconditions.checkState(jsonNodes.size() == 1); + return jsonNodes; + } + + private long getIncrementalTableRowCount(final JdbcDatabase database, + final String fullTableName, + final CursorInfo cursorInfo, + final PostgresType cursorFieldType) + throws SQLException { + final String quotedCursorField = getIdentifierWithQuoting(cursorInfo.getCursorField(), getQuoteString()); + + // Calculate actual number of rows to sync here. + final List result = database.queryJsons( + connection -> { + LOGGER.info("Preparing query for table: {}", fullTableName); + final String operator; + if (cursorInfo.getCursorRecordCount() <= 0L) { + operator = ">"; + } else { + final long actualRecordCount = getActualCursorRecordCount( + connection, fullTableName, quotedCursorField, cursorFieldType, cursorInfo.getCursor()); + LOGGER.info("Table {} cursor count: expected {}, actual {}", fullTableName, cursorInfo.getCursorRecordCount(), actualRecordCount); + if (actualRecordCount == cursorInfo.getCursorRecordCount()) { + operator = ">"; + } else { + operator = ">="; + } + } + + final StringBuilder sql = new StringBuilder(String.format("SELECT COUNT(*) FROM %s WHERE %s %s ?", + fullTableName, + quotedCursorField, + operator)); + + final PreparedStatement preparedStatement = connection.prepareStatement(sql.toString()); + LOGGER.info("Executing query for table {}: {}", fullTableName, preparedStatement); + sourceOperations.setCursorField(preparedStatement, 1, cursorFieldType, cursorInfo.getCursor()); + return preparedStatement; + }, + resultSet -> JdbcUtils.getDefaultSourceOperations().rowToJson(resultSet)); + + Preconditions.checkState(result.size() == 1); + return result.get(0).get("count").asLong(); + } + } diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java index ec3a026ad17a..77359c4396fc 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java @@ -31,6 +31,8 @@ import io.airbyte.protocol.models.JsonSchemaType; import io.airbyte.protocol.models.v0.AirbyteCatalog; import io.airbyte.protocol.models.v0.AirbyteMessage; +import io.airbyte.protocol.models.v0.AirbyteMessage.Type; +import io.airbyte.protocol.models.v0.AirbyteStateMessage; import io.airbyte.protocol.models.v0.AirbyteStream; import io.airbyte.protocol.models.v0.CatalogHelpers; import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; @@ -40,6 +42,7 @@ import io.airbyte.test.utils.PostgreSQLContainerHelper; import java.math.BigDecimal; import java.sql.SQLException; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; @@ -101,6 +104,8 @@ class PostgresSourceTest { .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) .withSourceDefinedPrimaryKey(List.of(List.of("id"))))); private static final ConfiguredAirbyteCatalog CONFIGURED_CATALOG = CatalogHelpers.toDefaultConfiguredCatalog(CATALOG); + private static final ConfiguredAirbyteCatalog CONFIGURED_INCR_CATALOG = toIncrementalConfiguredCatalog(CATALOG); + private static final Set ASCII_MESSAGES = Sets.newHashSet( createRecord(STREAM_NAME, SCHEMA_NAME, map("id", new BigDecimal("1.0"), "name", "goku", "power", null)), createRecord(STREAM_NAME, SCHEMA_NAME, map("id", new BigDecimal("2.0"), "name", "vegeta", "power", 9000.1)), @@ -458,6 +463,34 @@ void testReadSuccess() throws Exception { assertEquals(ASCII_MESSAGES, actualMessages); } + @Test + void testReadIncrementalSuccess() throws Exception { + final ConfiguredAirbyteCatalog configuredCatalog = + CONFIGURED_INCR_CATALOG + .withStreams(CONFIGURED_INCR_CATALOG.getStreams().stream().filter(s -> s.getStream().getName().equals(STREAM_NAME)).collect( + Collectors.toList())); + final Set actualMessages = MoreIterators.toSet(new PostgresSource().read(getConfig(PSQL_DB, dbName), configuredCatalog, null)); + setEmittedAtToNull(actualMessages); + + final List stateAfterFirstBatch = extractStateMessage(actualMessages); + + setEmittedAtToNull(actualMessages); + + // An extra state message is emitted, in addition to the record messages. + assertEquals(actualMessages.size(), ASCII_MESSAGES.size() + 1); + assertThat(actualMessages.contains(ASCII_MESSAGES)); + + final JsonNode state = Jsons.jsonNode(stateAfterFirstBatch); + + // Incremental sync should only read one new message (where id = 'NaN') + final Set nextSyncMessages = MoreIterators.toSet(new PostgresSource().read(getConfig(PSQL_DB, dbName), configuredCatalog, state)); + setEmittedAtToNull(nextSyncMessages); + + // An extra state message is emitted, in addition to the record messages. + assertEquals(nextSyncMessages.size(), 2); + assertThat(nextSyncMessages.contains(createRecord(STREAM_NAME, SCHEMA_NAME, map("id", null, "name", "piccolo", "power", null)))); + } + @Test void testIsCdc() { final JsonNode config = getConfig(PSQL_DB, dbName); @@ -634,4 +667,26 @@ CREATE VIEW test_view_null_cursor(id) as } + private static List extractStateMessage(final Set messages) { + return messages.stream().filter(r -> r.getType() == Type.STATE).map(AirbyteMessage::getState) + .collect(Collectors.toList()); + } + + private static ConfiguredAirbyteCatalog toIncrementalConfiguredCatalog(final AirbyteCatalog catalog) { + return new ConfiguredAirbyteCatalog() + .withStreams(catalog.getStreams() + .stream() + .map(s -> toIncrementalConfiguredStream(s)) + .toList()); + } + + private static ConfiguredAirbyteStream toIncrementalConfiguredStream(final AirbyteStream stream) { + return new ConfiguredAirbyteStream() + .withStream(stream) + .withSyncMode(SyncMode.INCREMENTAL) + .withCursorField(List.of("id")) + .withDestinationSyncMode(DestinationSyncMode.APPEND) + .withPrimaryKey(new ArrayList<>()); + } + } diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java index edcebf253b24..8e2fbfa55f25 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java @@ -248,6 +248,30 @@ protected boolean verifyCursorColumnValues(final Database database, final String return true; } + /** + * Estimates the total volume (rows and bytes) to sync and emits a + * {@link AirbyteEstimateTraceMessage} associated with the full refresh stream. + * + * @param database database + */ + protected void estimateFullRefreshSyncSize(final Database database, + final ConfiguredAirbyteStream configuredAirbyteStream) { + /* no-op */ + } + + /** + * Estimates the total volume (rows and bytes) to sync and emits a + * {@link AirbyteEstimateTraceMessage} associated with an incremental stream. + * + * @param database database + */ + protected void estimateIncrementalSyncSize(final Database database, + final ConfiguredAirbyteStream configuredAirbyteStream, + final CursorInfo cursorInfo, + final DataType dataType) { + /* no-op */ + } + private List>> discoverWithoutSystemTables( final Database database) throws Exception { @@ -378,6 +402,7 @@ private AutoCloseableIterator createReadIterator(final Database emittedAt); } else { // if no cursor is present then this is the first read for is the same as doing a full refresh read. + estimateFullRefreshSyncSize(database, airbyteStream); airbyteMessageIterator = getFullRefreshStream(database, streamName, namespace, selectedDatabaseFields, table, emittedAt); } @@ -396,6 +421,7 @@ private AutoCloseableIterator createReadIterator(final Database getStateEmissionFrequency()), airbyteMessageIterator); } else if (airbyteStream.getSyncMode() == SyncMode.FULL_REFRESH) { + estimateFullRefreshSyncSize(database, airbyteStream); iterator = getFullRefreshStream(database, streamName, namespace, selectedDatabaseFields, table, emittedAt); } else if (airbyteStream.getSyncMode() == null) { @@ -445,6 +471,7 @@ private AutoCloseableIterator getIncrementalStream(final Databas table.getFields().stream().anyMatch(f -> f.getName().equals(cursorField)), String.format("Could not find cursor field %s in table %s", cursorField, table.getName())); + estimateIncrementalSyncSize(database, airbyteStream, cursorInfo, cursorType); final AutoCloseableIterator queryIterator = queryTableIncremental( database, selectedDatabaseFields, diff --git a/docs/integrations/sources/postgres.md b/docs/integrations/sources/postgres.md index 7e401765c920..fb08e2c5bb24 100644 --- a/docs/integrations/sources/postgres.md +++ b/docs/integrations/sources/postgres.md @@ -411,6 +411,7 @@ The root causes is that the WALs needed for the incremental sync has been remove | Version | Date | Pull Request | Subject | |:--------|:-----------|:----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 1.0.37 | 2023-01-17 | [20783](https://github.com/airbytehq/airbyte/pull/20783) | Emit estimate trace messages for non-CDC mode. | | 1.0.36 | 2023-01-11 | [21003](https://github.com/airbytehq/airbyte/pull/21003) | Handle null values for array data types in CDC mode gracefully. | | 1.0.35 | 2023-01-04 | [20469](https://github.com/airbytehq/airbyte/pull/20469) | Introduce feature to make LSN commit behaviour configurable. | | 1.0.34 | 2022-12-13 | [20378](https://github.com/airbytehq/airbyte/pull/20378) | Improve descriptions |