From aa40c0c0ea128226e6187b0f03dee4edfaf9e620 Mon Sep 17 00:00:00 2001 From: Akash Kulkarni Date: Wed, 21 Dec 2022 15:40:22 -0800 Subject: [PATCH 01/14] Emit estimate trace messages --- .../base/AirbyteTraceMessageUtility.java | 22 ++- .../source/jdbc/AbstractJdbcSource.java | 11 +- .../source/postgres/PostgresQueryUtils.java | 52 +++++++ .../source/postgres/PostgresSource.java | 143 ++++++++++++++++-- .../source/postgres/PostgresSourceTest.java | 79 ++++++++-- .../source/relationaldb/AbstractDbSource.java | 37 ++++- 6 files changed, 308 insertions(+), 36 deletions(-) create mode 100644 airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresQueryUtils.java diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/AirbyteTraceMessageUtility.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/AirbyteTraceMessageUtility.java index 885a307b3eb3..7a8a1a1a6535 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/AirbyteTraceMessageUtility.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/AirbyteTraceMessageUtility.java @@ -6,6 +6,7 @@ import io.airbyte.protocol.models.AirbyteErrorTraceMessage; import io.airbyte.protocol.models.AirbyteErrorTraceMessage.FailureType; +import io.airbyte.protocol.models.AirbyteEstimateTraceMessage; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteMessage.Type; import io.airbyte.protocol.models.AirbyteTraceMessage; @@ -24,6 +25,21 @@ public static void emitConfigErrorTrace(final Throwable e, final String displayM emitErrorTrace(e, displayMessage, FailureType.CONFIG_ERROR); } + public static void emitEstimateTrace(final long byteEstimate, + final AirbyteEstimateTraceMessage.Type type, + final long rowEstimate, + final String streamName, + final String streamNamespace) { + emitMessage(makeAirbyteMessageFromTraceMessage( + makeAirbyteTraceMessage(AirbyteTraceMessage.Type.ESTIMATE) + .withEstimate(new AirbyteEstimateTraceMessage() + .withByteEstimate(byteEstimate) + .withType(type) + .withRowEstimate(rowEstimate) + .withName(streamName) + .withNamespace(streamNamespace)))); + } + public static void emitErrorTrace(final Throwable e, final String displayMessage, final FailureType failureType) { emitMessage(makeErrorTraceAirbyteMessage(e, displayMessage, failureType)); } @@ -35,10 +51,10 @@ public static void emitErrorTrace(final Throwable e, final String displayMessage // public void emitNotificationTrace() {} // public void emitMetricTrace() {} - private static void emitMessage(AirbyteMessage message) { + private static void emitMessage(final AirbyteMessage message) { // Not sure why defaultOutputRecordCollector is under Destination specifically, // but this matches usage elsewhere in base-java - Consumer outputRecordCollector = Destination::defaultOutputRecordCollector; + final Consumer outputRecordCollector = Destination::defaultOutputRecordCollector; outputRecordCollector.accept(message); } @@ -56,7 +72,7 @@ private static AirbyteMessage makeErrorTraceAirbyteMessage( .withStackTrace(ExceptionUtils.getStackTrace(e)))); } - private static AirbyteMessage makeAirbyteMessageFromTraceMessage(AirbyteTraceMessage airbyteTraceMessage) { + private static AirbyteMessage makeAirbyteMessageFromTraceMessage(final AirbyteTraceMessage airbyteTraceMessage) { return new AirbyteMessage().withType(Type.TRACE).withTrace(airbyteTraceMessage); } diff --git a/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java b/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java index dbde54c5cdb9..b651a98bb05f 100644 --- a/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java +++ b/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java @@ -410,11 +410,11 @@ protected String getCountColumnName() { return "record_count"; } - private long getActualCursorRecordCount(final Connection connection, - final String fullTableName, - final String quotedCursorField, - final Datatype cursorFieldType, - final String cursor) + protected long getActualCursorRecordCount(final Connection connection, + final String fullTableName, + final String quotedCursorField, + final Datatype cursorFieldType, + final String cursor) throws SQLException { final String columnName = getCountColumnName(); final PreparedStatement cursorRecordStatement; @@ -621,4 +621,5 @@ protected List identifyStreamsToSnapshot(final Configur .map(Jsons::clone) .collect(Collectors.toList()); } + } diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresQueryUtils.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresQueryUtils.java new file mode 100644 index 000000000000..a1b3b96c9fd9 --- /dev/null +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresQueryUtils.java @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.postgres; + +import java.sql.SQLException; + +/** + * Utility class to define constants related to querying postgres. + */ +public class PostgresQueryUtils { + + public static final String NULL_CURSOR_VALUE_WITH_SCHEMA = + """ + SELECT + (EXISTS (SELECT FROM information_schema.columns WHERE table_schema = '%s' AND table_name = '%s' AND is_nullable = 'YES' AND column_name = '%s')) + AND + (EXISTS (SELECT from %s.\"%s\" where \"%s\" IS NULL LIMIT 1)) AS %s + """; + public static final String NULL_CURSOR_VALUE_NO_SCHEMA = + """ + SELECT + (EXISTS (SELECT FROM information_schema.columns WHERE table_name = '%s' AND is_nullable = 'YES' AND column_name = '%s')) + AND + (EXISTS (SELECT from \"%s\" where \"%s\" IS NULL LIMIT 1)) AS %s + """; + + public static final String TABLE_ESTIMATE_QUERY = + """ + SELECT (SELECT COUNT(*) FROM %s) AS %s, + pg_relation_size('%s') AS %s; + """; + + public static final String ROW_COUNT_RESULT_COL = "rowcount"; + + public static final String TOTAL_BYTES_RESULT_COL = "totalbytes"; + + + public static String getFullyQualifiedTableNameWithQuoting(final String identifierQuoteString, + final String schemaName, + final String tableName) + throws SQLException { + final String quotedTableName = enquoteIdentifier(identifierQuoteString, tableName); + return schemaName != null ? enquoteIdentifier(identifierQuoteString, schemaName) + "." + quotedTableName : quotedTableName; + } + + public static String enquoteIdentifier(final String identifierQuoteString, final String identifier) { + return identifierQuoteString + identifier + identifierQuoteString; + } + +} diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java index e57254a6e7d8..2df867e79224 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java @@ -8,6 +8,13 @@ import static io.airbyte.db.jdbc.JdbcUtils.EQUALS; import static io.airbyte.integrations.debezium.AirbyteDebeziumHandler.shouldUseCDC; import static io.airbyte.integrations.source.jdbc.JdbcSSLConnectionUtils.PARAM_CA_CERTIFICATE; +import static io.airbyte.integrations.source.postgres.PostgresQueryUtils.NULL_CURSOR_VALUE_NO_SCHEMA; +import static io.airbyte.integrations.source.postgres.PostgresQueryUtils.NULL_CURSOR_VALUE_WITH_SCHEMA; +import static io.airbyte.integrations.source.postgres.PostgresQueryUtils.ROW_COUNT_RESULT_COL; +import static io.airbyte.integrations.source.postgres.PostgresQueryUtils.TABLE_ESTIMATE_QUERY; +import static io.airbyte.integrations.source.postgres.PostgresQueryUtils.TOTAL_BYTES_RESULT_COL; +import static io.airbyte.integrations.source.postgres.PostgresQueryUtils.enquoteIdentifier; +import static io.airbyte.integrations.source.postgres.PostgresQueryUtils.getFullyQualifiedTableNameWithQuoting; import static io.airbyte.integrations.util.PostgresSslConnectionUtils.DISABLE; import static io.airbyte.integrations.util.PostgresSslConnectionUtils.PARAM_SSL_MODE; import static java.util.stream.Collectors.toList; @@ -18,6 +25,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import io.airbyte.commons.exceptions.ConfigErrorException; import io.airbyte.commons.features.EnvVariableFeatureFlags; import io.airbyte.commons.features.FeatureFlags; import io.airbyte.commons.functional.CheckedConsumer; @@ -29,6 +37,7 @@ import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.db.jdbc.JdbcUtils; import io.airbyte.db.jdbc.streaming.AdaptiveStreamingQueryConfig; +import io.airbyte.integrations.base.AirbyteTraceMessageUtility; import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.integrations.base.Source; import io.airbyte.integrations.base.ssh.SshWrappedSource; @@ -37,6 +46,7 @@ import io.airbyte.integrations.source.jdbc.AbstractJdbcSource; import io.airbyte.integrations.source.jdbc.JdbcSSLConnectionUtils; import io.airbyte.integrations.source.jdbc.dto.JdbcPrivilegeDto; +import io.airbyte.integrations.source.relationaldb.CursorInfo; import io.airbyte.integrations.source.relationaldb.TableInfo; import io.airbyte.integrations.source.relationaldb.models.CdcState; import io.airbyte.integrations.source.relationaldb.models.DbState; @@ -45,6 +55,7 @@ import io.airbyte.protocol.models.AirbyteCatalog; import io.airbyte.protocol.models.AirbyteConnectionStatus; import io.airbyte.protocol.models.AirbyteConnectionStatus.Status; +import io.airbyte.protocol.models.AirbyteEstimateTraceMessage.Type; import io.airbyte.protocol.models.AirbyteGlobalState; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteStateMessage; @@ -92,20 +103,7 @@ public class PostgresSource extends AbstractJdbcSource implements public static final String SSL_KEY = "sslkey"; public static final String SSL_PASSWORD = "sslpassword"; public static final String MODE = "mode"; - public static final String NULL_CURSOR_VALUE_WITH_SCHEMA = - """ - SELECT - (EXISTS (SELECT FROM information_schema.columns WHERE table_schema = '%s' AND table_name = '%s' AND is_nullable = 'YES' AND column_name = '%s')) - AND - (EXISTS (SELECT from %s.\"%s\" where \"%s\" IS NULL LIMIT 1)) AS %s - """; - public static final String NULL_CURSOR_VALUE_NO_SCHEMA = - """ - SELECT - (EXISTS (SELECT FROM information_schema.columns WHERE table_name = '%s' AND is_nullable = 'YES' AND column_name = '%s')) - AND - (EXISTS (SELECT from \"%s\" where \"%s\" IS NULL LIMIT 1)) AS %s - """; + private List schemas; private final FeatureFlags featureFlags; private static final Set INVALID_CDC_SSL_MODES = ImmutableSet.of("allow", "prefer"); @@ -517,7 +515,8 @@ protected static String toSslJdbcParamInternal(final SslMode sslMode) { } @Override - protected boolean verifyCursorColumnValues(final JdbcDatabase database, final String schema, final String tableName, final String columnName) throws SQLException { + protected boolean verifyCursorColumnValues(final JdbcDatabase database, final String schema, final String tableName, final String columnName) + throws SQLException { final String query; final String resultColName = "nullValue"; // Query: Only if cursor column allows null values, query whether it contains one @@ -536,4 +535,118 @@ protected boolean verifyCursorColumnValues(final JdbcDatabase database, final St LOGGER.debug("null value exist: {}", nullValExist); return !nullValExist; } + + @Override + protected void estimateFullRefreshSyncSize(final JdbcDatabase database, + final ConfiguredAirbyteStream configuredAirbyteStream) { + try { + final String schemaName = configuredAirbyteStream.getStream().getNamespace(); + final String tableName = configuredAirbyteStream.getStream().getName(); + final String fullTableName = + getFullyQualifiedTableNameWithQuoting(getQuoteString(), schemaName, tableName); + + final List tableEstimateResult = getFullTableEstimate(database, fullTableName); + + final long syncRowCount = tableEstimateResult.get(0).get(ROW_COUNT_RESULT_COL).asLong(); + final long syncByteCount = tableEstimateResult.get(0).get(TOTAL_BYTES_RESULT_COL).asLong(); + + // Here, we double the bytes estimate to account for serialization. Perhaps a better way to do this + // is to + // read a row and Stringify it to better understand the accurate volume of data sent over the wire. + // However, this approach doesn't account for different row sizes. + AirbyteTraceMessageUtility.emitEstimateTrace(2 * syncByteCount, Type.STREAM, syncRowCount, tableName, schemaName); + LOGGER.info("estimate for table: " + fullTableName + " : {total_bytes:" + syncByteCount + ", row_count : " + syncRowCount); + } catch (final SQLException e) { + throw new ConfigErrorException("Error occurred while attempting to estimate sync size", e); + } + } + + @Override + protected void estimateIncrementalSyncSize(final JdbcDatabase database, + final ConfiguredAirbyteStream configuredAirbyteStream, + final CursorInfo cursorInfo, + final PostgresType cursorFieldType) { + try { + final String schemaName = configuredAirbyteStream.getStream().getNamespace(); + final String tableName = configuredAirbyteStream.getStream().getName(); + final String fullTableName = + getFullyQualifiedTableNameWithQuoting(getQuoteString(), schemaName, tableName); + + final List tableEstimateResult = getFullTableEstimate(database, fullTableName); + + final long tableRowCount = tableEstimateResult.get(0).get(ROW_COUNT_RESULT_COL).asLong(); + final long tableByteCount = tableEstimateResult.get(0).get(TOTAL_BYTES_RESULT_COL).asLong(); + + final long syncRowCount; + final long syncByteCount; + + syncRowCount = getIncrementalTableRowCount(database, fullTableName, cursorInfo, cursorFieldType); + syncByteCount = (tableByteCount / tableRowCount) * syncRowCount; + + // Here, we double the bytes estimate to account for serialization. Perhaps a better way to do this + // is to + // read a row and Stringify it to better understand the accurate volume of data sent over the wire. + // However, this approach doesn't account for different row sizes + AirbyteTraceMessageUtility.emitEstimateTrace(2 * syncByteCount, Type.STREAM, syncRowCount, tableName, schemaName); + LOGGER.info("estimate for table: " + fullTableName + " : {row_count:" + syncRowCount + ", total_bytes : " + syncByteCount); + } catch (final SQLException e) { + throw new ConfigErrorException("Error occurred while attempting to estimate sync size", e); + } + } + + private List getFullTableEstimate(final JdbcDatabase database, final String fullTableName) throws SQLException { + // Construct the table estimate query. + final String tableEstimateQuery = + String.format(TABLE_ESTIMATE_QUERY, fullTableName, ROW_COUNT_RESULT_COL, fullTableName, TOTAL_BYTES_RESULT_COL); + LOGGER.debug("table estimate query: {}", tableEstimateQuery); + final List jsonNodes = database.bufferedResultSetQuery(conn -> conn.createStatement().executeQuery(tableEstimateQuery), + resultSet -> JdbcUtils.getDefaultSourceOperations().rowToJson(resultSet)); + Preconditions.checkState(jsonNodes.size() == 1); + return jsonNodes; + } + + private long getIncrementalTableRowCount(final JdbcDatabase database, + final String fullTableName, + final CursorInfo cursorInfo, + final PostgresType cursorFieldType) { + try { + final String quotedCursorField = enquoteIdentifier(getQuoteString(), cursorInfo.getCursorField()); + + // Calculate actual number of rows to sync here. + final List result = database.queryJsons( + connection -> { + LOGGER.info("Preparing query for table: {}", fullTableName); + final String operator; + if (cursorInfo.getCursorRecordCount() <= 0L) { + operator = ">"; + } else { + final long actualRecordCount = getActualCursorRecordCount( + connection, fullTableName, quotedCursorField, cursorFieldType, cursorInfo.getCursor()); + LOGGER.info("Table {} cursor count: expected {}, actual {}", fullTableName, cursorInfo.getCursorRecordCount(), actualRecordCount); + if (actualRecordCount == cursorInfo.getCursorRecordCount()) { + operator = ">"; + } else { + operator = ">="; + } + } + + final StringBuilder sql = new StringBuilder(String.format("SELECT COUNT(*) FROM %s WHERE %s %s ?", + fullTableName, + quotedCursorField, + operator)); + + final PreparedStatement preparedStatement = connection.prepareStatement(sql.toString()); + LOGGER.info("Executing query for table {}: {}", fullTableName, preparedStatement); + sourceOperations.setStatementField(preparedStatement, 1, cursorFieldType, cursorInfo.getCursor()); + return preparedStatement; + }, + resultSet -> JdbcUtils.getDefaultSourceOperations().rowToJson(resultSet)); + + Preconditions.checkState(result.size() == 1); + return result.get(0).get("count").asLong(); + } catch (final SQLException e) { + throw new ConfigErrorException("Error occurred while attempting to estimate sync size", e); + } + } + } diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java index ef6d2d354889..7d23502f637a 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java @@ -29,6 +29,8 @@ import io.airbyte.db.jdbc.JdbcUtils; import io.airbyte.protocol.models.AirbyteCatalog; import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.AirbyteMessage.Type; +import io.airbyte.protocol.models.AirbyteStateMessage; import io.airbyte.protocol.models.AirbyteStream; import io.airbyte.protocol.models.CatalogHelpers; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; @@ -40,6 +42,7 @@ import io.airbyte.test.utils.PostgreSQLContainerHelper; import java.math.BigDecimal; import java.sql.SQLException; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; @@ -101,6 +104,8 @@ class PostgresSourceTest { .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) .withSourceDefinedPrimaryKey(List.of(List.of("id"))))); private static final ConfiguredAirbyteCatalog CONFIGURED_CATALOG = CatalogHelpers.toDefaultConfiguredCatalog(CATALOG); + private static final ConfiguredAirbyteCatalog CONFIGURED_INCR_CATALOG = toIncrementalConfiguredCatalog(CATALOG); + private static final Set ASCII_MESSAGES = Sets.newHashSet( createRecord(STREAM_NAME, SCHEMA_NAME, map("id", new BigDecimal("1.0"), "name", "goku", "power", null)), createRecord(STREAM_NAME, SCHEMA_NAME, map("id", new BigDecimal("2.0"), "name", "vegeta", "power", 9000.1)), @@ -122,6 +127,8 @@ class PostgresSourceTest { private String dbName; + private Database database; + @BeforeAll static void init() { PSQL_DB = new PostgreSQLContainer<>("postgres:13-alpine"); @@ -139,7 +146,7 @@ void setup() throws Exception { final JsonNode config = getConfig(PSQL_DB, dbName); try (final DSLContext dslContext = getDslContext(config)) { - final Database database = getDatabase(dslContext); + this.database = getDatabase(dslContext); database.query(ctx -> { ctx.fetch( "CREATE TABLE id_and_name(id NUMERIC(20, 10) NOT NULL, name VARCHAR(200) NOT NULL, power double precision NOT NULL, PRIMARY KEY (id));"); @@ -458,6 +465,34 @@ void testReadSuccess() throws Exception { assertEquals(ASCII_MESSAGES, actualMessages); } + @Test + void testReadIncrementalSuccess() throws Exception { + final ConfiguredAirbyteCatalog configuredCatalog = + CONFIGURED_INCR_CATALOG + .withStreams(CONFIGURED_INCR_CATALOG.getStreams().stream().filter(s -> s.getStream().getName().equals(STREAM_NAME)).collect( + Collectors.toList())); + final Set actualMessages = MoreIterators.toSet(new PostgresSource().read(getConfig(PSQL_DB, dbName), configuredCatalog, null)); + setEmittedAtToNull(actualMessages); + + final List stateAfterFirstBatch = extractStateMessage(actualMessages); + + setEmittedAtToNull(actualMessages); + + // An extra state message is emitted, in addition to the record messages. + assertEquals(actualMessages.size(), ASCII_MESSAGES.size() + 1); + assertThat(actualMessages.contains(ASCII_MESSAGES)); + + final JsonNode state = Jsons.jsonNode(stateAfterFirstBatch); + + // Incremental sync should only read one new message (where id = 'NaN') + final Set nextSyncMessages = MoreIterators.toSet(new PostgresSource().read(getConfig(PSQL_DB, dbName), configuredCatalog, state)); + setEmittedAtToNull(nextSyncMessages); + + // An extra state message is emitted, in addition to the record messages. + assertEquals(nextSyncMessages.size(), 2); + assertThat(nextSyncMessages.contains(createRecord(STREAM_NAME, SCHEMA_NAME, map("id", null, "name", "piccolo", "power", null)))); + } + @Test void testIsCdc() { final JsonNode config = getConfig(PSQL_DB, dbName); @@ -579,9 +614,9 @@ private ConfiguredAirbyteStream createTableWithNullValueCursor(final Database da .withDestinationSyncMode(DestinationSyncMode.APPEND) .withSyncMode(SyncMode.INCREMENTAL) .withStream(CatalogHelpers.createAirbyteStream( - "test_table_null_cursor", - "public", - Field.of("id", JsonSchemaType.STRING)) + "test_table_null_cursor", + "public", + Field.of("id", JsonSchemaType.STRING)) .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) .withSourceDefinedPrimaryKey(List.of(List.of("id")))); @@ -613,10 +648,10 @@ private ConfiguredAirbyteStream createViewWithNullValueCursor(final Database dat database.query(ctx -> { ctx.fetch("CREATE TABLE IF NOT EXISTS public.test_table_null_cursor(id INTEGER NULL)"); ctx.fetch(""" - CREATE VIEW test_view_null_cursor(id) as - SELECT test_table_null_cursor.id - FROM test_table_null_cursor - """); + CREATE VIEW test_view_null_cursor(id) as + SELECT test_table_null_cursor.id + FROM test_table_null_cursor + """); ctx.fetch("INSERT INTO public.test_table_null_cursor(id) VALUES (1), (2), (NULL)"); return null; }); @@ -626,12 +661,34 @@ CREATE VIEW test_view_null_cursor(id) as .withDestinationSyncMode(DestinationSyncMode.APPEND) .withSyncMode(SyncMode.INCREMENTAL) .withStream(CatalogHelpers.createAirbyteStream( - "test_view_null_cursor", - "public", - Field.of("id", JsonSchemaType.STRING)) + "test_view_null_cursor", + "public", + Field.of("id", JsonSchemaType.STRING)) .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) .withSourceDefinedPrimaryKey(List.of(List.of("id")))); } + private static List extractStateMessage(final Set messages) { + return messages.stream().filter(r -> r.getType() == Type.STATE).map(AirbyteMessage::getState) + .collect(Collectors.toList()); + } + + private static ConfiguredAirbyteCatalog toIncrementalConfiguredCatalog(final AirbyteCatalog catalog) { + return new ConfiguredAirbyteCatalog() + .withStreams(catalog.getStreams() + .stream() + .map(s -> toIncrementalConfiguredStream(s)) + .toList()); + } + + private static ConfiguredAirbyteStream toIncrementalConfiguredStream(final AirbyteStream stream) { + return new ConfiguredAirbyteStream() + .withStream(stream) + .withSyncMode(SyncMode.INCREMENTAL) + .withCursorField(List.of("id")) + .withDestinationSyncMode(DestinationSyncMode.APPEND) + .withPrimaryKey(new ArrayList<>()); + } + } diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java index 595e55da2c80..290676f1eb27 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java @@ -33,6 +33,7 @@ import io.airbyte.protocol.models.AirbyteCatalog; import io.airbyte.protocol.models.AirbyteConnectionStatus; import io.airbyte.protocol.models.AirbyteConnectionStatus.Status; +import io.airbyte.protocol.models.AirbyteEstimateTraceMessage; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteMessage.Type; import io.airbyte.protocol.models.AirbyteRecordMessage; @@ -182,7 +183,9 @@ public AutoCloseableIterator read(final JsonNode config, private void validateCursorFieldForIncrementalTables( final Map>> tableNameToTable, - final ConfiguredAirbyteCatalog catalog, final Database database) throws SQLException { + final ConfiguredAirbyteCatalog catalog, + final Database database) + throws SQLException { final List tablesWithInvalidCursor = new ArrayList<>(); final List tablesWithInvalidCursorToWarnAbout = new ArrayList<>(); for (final ConfiguredAirbyteStream airbyteStream : catalog.getStreams()) { @@ -235,14 +238,41 @@ private void validateCursorFieldForIncrementalTables( /** * Verify that cursor column allows syncing to go through. + * * @param database database * @return true if syncing can go through. false otherwise * @throws SQLException exception */ - protected boolean verifyCursorColumnValues(final Database database, final String schema, final String tableName, final String columnName) throws SQLException { + protected boolean verifyCursorColumnValues(final Database database, final String schema, final String tableName, final String columnName) + throws SQLException { /* no-op */ return true; } + + /** + * Estimates the total volume (rows and bytes) to sync and emits a + * {@link AirbyteEstimateTraceMessage} associated with the full refresh stream. + * + * @param database database + */ + protected void estimateFullRefreshSyncSize(final Database database, + final ConfiguredAirbyteStream configuredAirbyteStream) { + /* no-op */ + } + + /** + * Estimates the total volume (rows and bytes) to sync and emits a + * {@link AirbyteEstimateTraceMessage} associated with an incremental stream. + * + * @param database database + */ + protected void estimateIncrementalSyncSize(final Database database, + final ConfiguredAirbyteStream configuredAirbyteStream, + final CursorInfo cursorInfo, + final DataType dataType) { + /* no-op */ + } + private List>> discoverWithoutSystemTables( final Database database) throws Exception { @@ -373,6 +403,7 @@ private AutoCloseableIterator createReadIterator(final Database emittedAt); } else { // if no cursor is present then this is the first read for is the same as doing a full refresh read. + estimateFullRefreshSyncSize(database, airbyteStream); airbyteMessageIterator = getFullRefreshStream(database, streamName, namespace, selectedDatabaseFields, table, emittedAt); } @@ -391,6 +422,7 @@ private AutoCloseableIterator createReadIterator(final Database getStateEmissionFrequency()), airbyteMessageIterator); } else if (airbyteStream.getSyncMode() == SyncMode.FULL_REFRESH) { + estimateFullRefreshSyncSize(database, airbyteStream); iterator = getFullRefreshStream(database, streamName, namespace, selectedDatabaseFields, table, emittedAt); } else if (airbyteStream.getSyncMode() == null) { @@ -440,6 +472,7 @@ private AutoCloseableIterator getIncrementalStream(final Databas table.getFields().stream().anyMatch(f -> f.getName().equals(cursorField)), String.format("Could not find cursor field %s in table %s", cursorField, table.getName())); + estimateIncrementalSyncSize(database, airbyteStream, cursorInfo, cursorType); final AutoCloseableIterator queryIterator = queryTableIncremental( database, selectedDatabaseFields, From 98e4c163cd7049bc623b6318ac2cc9c37897bdb3 Mon Sep 17 00:00:00 2001 From: Akash Kulkarni Date: Wed, 21 Dec 2022 15:53:40 -0800 Subject: [PATCH 02/14] Update PostgresQueryUtils.java --- .../integrations/source/postgres/PostgresQueryUtils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresQueryUtils.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresQueryUtils.java index a1b3b96c9fd9..2909ef33d3e4 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresQueryUtils.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresQueryUtils.java @@ -7,7 +7,7 @@ import java.sql.SQLException; /** - * Utility class to define constants related to querying postgres. + * Utility class to define constants related to querying postgres */ public class PostgresQueryUtils { From bac093369fa308d781274876a110cef4f2a8b0fa Mon Sep 17 00:00:00 2001 From: Akash Kulkarni Date: Wed, 21 Dec 2022 16:03:34 -0800 Subject: [PATCH 03/14] Remaining merge conflicts --- .../integrations/base/AirbyteTraceMessageUtility.java | 1 + .../integrations/source/postgres/PostgresQueryUtils.java | 1 - .../integrations/source/postgres/PostgresSource.java | 8 +++++--- .../integrations/source/postgres/PostgresSourceTest.java | 2 ++ 4 files changed, 8 insertions(+), 4 deletions(-) diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/AirbyteTraceMessageUtility.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/AirbyteTraceMessageUtility.java index 780fae4dd0f7..38fa74d890ba 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/AirbyteTraceMessageUtility.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/AirbyteTraceMessageUtility.java @@ -6,6 +6,7 @@ import io.airbyte.protocol.models.v0.AirbyteErrorTraceMessage; import io.airbyte.protocol.models.v0.AirbyteErrorTraceMessage.FailureType; +import io.airbyte.protocol.models.v0.AirbyteEstimateTraceMessage; import io.airbyte.protocol.models.v0.AirbyteMessage; import io.airbyte.protocol.models.v0.AirbyteMessage.Type; import io.airbyte.protocol.models.v0.AirbyteTraceMessage; diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresQueryUtils.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresQueryUtils.java index 2909ef33d3e4..7f2629333ae2 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresQueryUtils.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresQueryUtils.java @@ -36,7 +36,6 @@ public class PostgresQueryUtils { public static final String TOTAL_BYTES_RESULT_COL = "totalbytes"; - public static String getFullyQualifiedTableNameWithQuoting(final String identifierQuoteString, final String schemaName, final String tableName) diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java index 4f8c9a1a1f84..8f3bfa922aed 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java @@ -56,6 +56,7 @@ import io.airbyte.protocol.models.v0.AirbyteCatalog; import io.airbyte.protocol.models.v0.AirbyteConnectionStatus; import io.airbyte.protocol.models.v0.AirbyteConnectionStatus.Status; +import io.airbyte.protocol.models.v0.AirbyteEstimateTraceMessage.Type; import io.airbyte.protocol.models.v0.AirbyteGlobalState; import io.airbyte.protocol.models.v0.AirbyteMessage; import io.airbyte.protocol.models.v0.AirbyteStateMessage; @@ -605,9 +606,9 @@ private List getFullTableEstimate(final JdbcDatabase database, final S } private long getIncrementalTableRowCount(final JdbcDatabase database, - final String fullTableName, - final CursorInfo cursorInfo, - final PostgresType cursorFieldType) { + final String fullTableName, + final CursorInfo cursorInfo, + final PostgresType cursorFieldType) { try { final String quotedCursorField = enquoteIdentifier(getQuoteString(), cursorInfo.getCursorField()); @@ -647,4 +648,5 @@ private long getIncrementalTableRowCount(final JdbcDatabase database, throw new ConfigErrorException("Error occurred while attempting to estimate sync size", e); } } + } diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java index 1e79c65efb12..4188641b58e6 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java @@ -31,6 +31,8 @@ import io.airbyte.protocol.models.JsonSchemaType; import io.airbyte.protocol.models.v0.AirbyteCatalog; import io.airbyte.protocol.models.v0.AirbyteMessage; +import io.airbyte.protocol.models.v0.AirbyteMessage.Type; +import io.airbyte.protocol.models.v0.AirbyteStateMessage; import io.airbyte.protocol.models.v0.AirbyteStream; import io.airbyte.protocol.models.v0.CatalogHelpers; import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; From 838bb698751648b328955ab09c4b158837eaae7d Mon Sep 17 00:00:00 2001 From: Akash Kulkarni Date: Thu, 22 Dec 2022 09:50:41 -0800 Subject: [PATCH 04/14] Code cleanip --- .../integrations/source/postgres/PostgresSourceTest.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java index 4188641b58e6..8903a1457b9e 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java @@ -127,8 +127,6 @@ class PostgresSourceTest { private String dbName; - private Database database; - @BeforeAll static void init() { PSQL_DB = new PostgreSQLContainer<>("postgres:13-alpine"); @@ -146,7 +144,7 @@ void setup() throws Exception { final JsonNode config = getConfig(PSQL_DB, dbName); try (final DSLContext dslContext = getDslContext(config)) { - this.database = getDatabase(dslContext); + final Database database = getDatabase(dslContext); database.query(ctx -> { ctx.fetch( "CREATE TABLE id_and_name(id NUMERIC(20, 10) NOT NULL, name VARCHAR(200) NOT NULL, power double precision NOT NULL, PRIMARY KEY (id));"); From e6fce719a95879a8bf6463ab9defbefaeb11a4ff Mon Sep 17 00:00:00 2001 From: Akash Kulkarni Date: Thu, 29 Dec 2022 11:07:34 -0800 Subject: [PATCH 05/14] Address comments --- .../java/io/airbyte/db/jdbc/JdbcUtils.java | 5 +++++ .../source/postgres/PostgresQueryUtils.java | 4 ++-- .../source/postgres/PostgresSource.java | 19 +++++++++++-------- .../source/postgres/PostgresUtils.java | 1 + 4 files changed, 19 insertions(+), 10 deletions(-) diff --git a/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/JdbcUtils.java b/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/JdbcUtils.java index 5529b35668c4..b6d2ef626957 100644 --- a/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/JdbcUtils.java +++ b/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/JdbcUtils.java @@ -54,6 +54,11 @@ public class JdbcUtils { public static final String MODE_KEY = "mode"; public static final String AMPERSAND = "&"; public static final String EQUALS = "="; + + // An estimate for how much additional data in sent over the wire due to conversion of source data into {@link AirbyteMessage}. This is due to + // the fact that records are in JSON format and all database fields are converted to Strings. Currently, this is used in the logic for emitting + // estimate trace messages. + public static final int PLATFORM_DATA_INCREASE_FACTOR = 2; public static final Set ALLOWED_CURSOR_TYPES = Set.of(TIMESTAMP, TIME, DATE, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE, REAL, NUMERIC, DECIMAL, NVARCHAR, VARCHAR, LONGVARCHAR); private static final JdbcSourceOperations defaultSourceOperations = new JdbcSourceOperations(); diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresQueryUtils.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresQueryUtils.java index 7f2629333ae2..5ba9af855502 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresQueryUtils.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresQueryUtils.java @@ -11,14 +11,14 @@ */ public class PostgresQueryUtils { - public static final String NULL_CURSOR_VALUE_WITH_SCHEMA = + public static final String NULL_CURSOR_VALUE_WITH_SCHEMA_QUERY = """ SELECT (EXISTS (SELECT FROM information_schema.columns WHERE table_schema = '%s' AND table_name = '%s' AND is_nullable = 'YES' AND column_name = '%s')) AND (EXISTS (SELECT from %s.\"%s\" where \"%s\" IS NULL LIMIT 1)) AS %s """; - public static final String NULL_CURSOR_VALUE_NO_SCHEMA = + public static final String NULL_CURSOR_VALUE_NO_SCHEMA_QUERY = """ SELECT (EXISTS (SELECT FROM information_schema.columns WHERE table_name = '%s' AND is_nullable = 'YES' AND column_name = '%s')) diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java index 8f3bfa922aed..ffce0448ed23 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java @@ -6,10 +6,11 @@ import static io.airbyte.db.jdbc.JdbcUtils.AMPERSAND; import static io.airbyte.db.jdbc.JdbcUtils.EQUALS; +import static io.airbyte.db.jdbc.JdbcUtils.PLATFORM_DATA_INCREASE_FACTOR; import static io.airbyte.integrations.debezium.AirbyteDebeziumHandler.shouldUseCDC; import static io.airbyte.integrations.source.jdbc.JdbcSSLConnectionUtils.PARAM_CA_CERTIFICATE; -import static io.airbyte.integrations.source.postgres.PostgresQueryUtils.NULL_CURSOR_VALUE_NO_SCHEMA; -import static io.airbyte.integrations.source.postgres.PostgresQueryUtils.NULL_CURSOR_VALUE_WITH_SCHEMA; +import static io.airbyte.integrations.source.postgres.PostgresQueryUtils.NULL_CURSOR_VALUE_NO_SCHEMA_QUERY; +import static io.airbyte.integrations.source.postgres.PostgresQueryUtils.NULL_CURSOR_VALUE_WITH_SCHEMA_QUERY; import static io.airbyte.integrations.source.postgres.PostgresQueryUtils.ROW_COUNT_RESULT_COL; import static io.airbyte.integrations.source.postgres.PostgresQueryUtils.TABLE_ESTIMATE_QUERY; import static io.airbyte.integrations.source.postgres.PostgresQueryUtils.TOTAL_BYTES_RESULT_COL; @@ -521,10 +522,10 @@ protected boolean verifyCursorColumnValues(final JdbcDatabase database, final St final String resultColName = "nullValue"; // Query: Only if cursor column allows null values, query whether it contains one if (StringUtils.isNotBlank(schema)) { - query = String.format(NULL_CURSOR_VALUE_WITH_SCHEMA, + query = String.format(NULL_CURSOR_VALUE_WITH_SCHEMA_QUERY, schema, tableName, columnName, schema, tableName, columnName, resultColName); } else { - query = String.format(NULL_CURSOR_VALUE_NO_SCHEMA, + query = String.format(NULL_CURSOR_VALUE_NO_SCHEMA_QUERY, tableName, columnName, tableName, columnName, resultColName); } LOGGER.debug("null value query: {}", query); @@ -554,8 +555,9 @@ protected void estimateFullRefreshSyncSize(final JdbcDatabase database, // is to // read a row and Stringify it to better understand the accurate volume of data sent over the wire. // However, this approach doesn't account for different row sizes. - AirbyteTraceMessageUtility.emitEstimateTrace(2 * syncByteCount, Type.STREAM, syncRowCount, tableName, schemaName); - LOGGER.info("estimate for table: " + fullTableName + " : {total_bytes:" + syncByteCount + ", row_count : " + syncRowCount); + AirbyteTraceMessageUtility.emitEstimateTrace(PLATFORM_DATA_INCREASE_FACTOR * syncByteCount, Type.STREAM, syncRowCount, tableName, schemaName); + LOGGER.info(String.format("Estimate for table: %s : {sync_row_count: %s, sync_bytes: %s, total_table_row_count: %s, total_table_bytes: %s}", + fullTableName, syncRowCount, syncByteCount, syncRowCount, syncByteCount)); } catch (final SQLException e) { throw new ConfigErrorException("Error occurred while attempting to estimate sync size", e); } @@ -587,8 +589,9 @@ protected void estimateIncrementalSyncSize(final JdbcDatabase database, // is to // read a row and Stringify it to better understand the accurate volume of data sent over the wire. // However, this approach doesn't account for different row sizes - AirbyteTraceMessageUtility.emitEstimateTrace(2 * syncByteCount, Type.STREAM, syncRowCount, tableName, schemaName); - LOGGER.info("estimate for table: " + fullTableName + " : {row_count:" + syncRowCount + ", total_bytes : " + syncByteCount); + AirbyteTraceMessageUtility.emitEstimateTrace(PLATFORM_DATA_INCREASE_FACTOR * syncByteCount, Type.STREAM, syncRowCount, tableName, schemaName); + LOGGER.info(String.format("Estimate for table: %s : {sync_row_count: %s, sync_bytes: %s, total_table_row_count: %s, total_table_bytes: %s}", + fullTableName, syncRowCount, syncByteCount, tableRowCount, tableRowCount)); } catch (final SQLException e) { throw new ConfigErrorException("Error occurred while attempting to estimate sync size", e); } diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresUtils.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresUtils.java index ebc2f1f9870b..265c35d14b51 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresUtils.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresUtils.java @@ -42,6 +42,7 @@ public class PostgresUtils { public static final Duration MAX_FIRST_RECORD_WAIT_TIME = Duration.ofMinutes(20); public static final Duration DEFAULT_FIRST_RECORD_WAIT_TIME = Duration.ofMinutes(5); + public static String getPluginValue(final JsonNode field) { return field.has("plugin") ? field.get("plugin").asText() : PGOUTPUT_PLUGIN; } From c1555d4453366fd3bcd3c641749cd015cfcb77b6 Mon Sep 17 00:00:00 2001 From: Akash Kulkarni Date: Thu, 29 Dec 2022 11:37:39 -0800 Subject: [PATCH 06/14] Formatting --- .../db-lib/src/main/java/io/airbyte/db/jdbc/JdbcUtils.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/JdbcUtils.java b/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/JdbcUtils.java index b6d2ef626957..7fe8673241f2 100644 --- a/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/JdbcUtils.java +++ b/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/JdbcUtils.java @@ -55,8 +55,10 @@ public class JdbcUtils { public static final String AMPERSAND = "&"; public static final String EQUALS = "="; - // An estimate for how much additional data in sent over the wire due to conversion of source data into {@link AirbyteMessage}. This is due to - // the fact that records are in JSON format and all database fields are converted to Strings. Currently, this is used in the logic for emitting + // An estimate for how much additional data in sent over the wire due to conversion of source data + // into {@link AirbyteMessage}. This is due to + // the fact that records are in JSON format and all database fields are converted to Strings. + // Currently, this is used in the logic for emitting // estimate trace messages. public static final int PLATFORM_DATA_INCREASE_FACTOR = 2; public static final Set ALLOWED_CURSOR_TYPES = Set.of(TIMESTAMP, TIME, DATE, TINYINT, SMALLINT, INTEGER, From 9e443d25047cc4e4812d710483c8fa7760961ef2 Mon Sep 17 00:00:00 2001 From: Akash Kulkarni Date: Wed, 4 Jan 2023 11:40:33 +0530 Subject: [PATCH 07/14] Cleanup --- .../source/postgres/PostgresQueryUtils.java | 14 -------------- .../source/postgres/PostgresSource.java | 12 ++++++------ 2 files changed, 6 insertions(+), 20 deletions(-) diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresQueryUtils.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresQueryUtils.java index 5ba9af855502..b7584605d63c 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresQueryUtils.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresQueryUtils.java @@ -4,8 +4,6 @@ package io.airbyte.integrations.source.postgres; -import java.sql.SQLException; - /** * Utility class to define constants related to querying postgres */ @@ -36,16 +34,4 @@ public class PostgresQueryUtils { public static final String TOTAL_BYTES_RESULT_COL = "totalbytes"; - public static String getFullyQualifiedTableNameWithQuoting(final String identifierQuoteString, - final String schemaName, - final String tableName) - throws SQLException { - final String quotedTableName = enquoteIdentifier(identifierQuoteString, tableName); - return schemaName != null ? enquoteIdentifier(identifierQuoteString, schemaName) + "." + quotedTableName : quotedTableName; - } - - public static String enquoteIdentifier(final String identifierQuoteString, final String identifier) { - return identifierQuoteString + identifier + identifierQuoteString; - } - } diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java index ffce0448ed23..1a54fb033ad9 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java @@ -14,8 +14,8 @@ import static io.airbyte.integrations.source.postgres.PostgresQueryUtils.ROW_COUNT_RESULT_COL; import static io.airbyte.integrations.source.postgres.PostgresQueryUtils.TABLE_ESTIMATE_QUERY; import static io.airbyte.integrations.source.postgres.PostgresQueryUtils.TOTAL_BYTES_RESULT_COL; -import static io.airbyte.integrations.source.postgres.PostgresQueryUtils.enquoteIdentifier; -import static io.airbyte.integrations.source.postgres.PostgresQueryUtils.getFullyQualifiedTableNameWithQuoting; +import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.getFullyQualifiedTableNameWithQuoting; +import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.getIdentifierWithQuoting; import static io.airbyte.integrations.util.PostgresSslConnectionUtils.DISABLE; import static io.airbyte.integrations.util.PostgresSslConnectionUtils.PARAM_SSL_MODE; import static java.util.stream.Collectors.toList; @@ -544,7 +544,7 @@ protected void estimateFullRefreshSyncSize(final JdbcDatabase database, final String schemaName = configuredAirbyteStream.getStream().getNamespace(); final String tableName = configuredAirbyteStream.getStream().getName(); final String fullTableName = - getFullyQualifiedTableNameWithQuoting(getQuoteString(), schemaName, tableName); + getFullyQualifiedTableNameWithQuoting(schemaName, tableName, getQuoteString()); final List tableEstimateResult = getFullTableEstimate(database, fullTableName); @@ -572,7 +572,7 @@ protected void estimateIncrementalSyncSize(final JdbcDatabase database, final String schemaName = configuredAirbyteStream.getStream().getNamespace(); final String tableName = configuredAirbyteStream.getStream().getName(); final String fullTableName = - getFullyQualifiedTableNameWithQuoting(getQuoteString(), schemaName, tableName); + getFullyQualifiedTableNameWithQuoting(schemaName, tableName, getQuoteString()); final List tableEstimateResult = getFullTableEstimate(database, fullTableName); @@ -613,7 +613,7 @@ private long getIncrementalTableRowCount(final JdbcDatabase database, final CursorInfo cursorInfo, final PostgresType cursorFieldType) { try { - final String quotedCursorField = enquoteIdentifier(getQuoteString(), cursorInfo.getCursorField()); + final String quotedCursorField = getIdentifierWithQuoting(cursorInfo.getCursorField(), getQuoteString()); // Calculate actual number of rows to sync here. final List result = database.queryJsons( @@ -640,7 +640,7 @@ private long getIncrementalTableRowCount(final JdbcDatabase database, final PreparedStatement preparedStatement = connection.prepareStatement(sql.toString()); LOGGER.info("Executing query for table {}: {}", fullTableName, preparedStatement); - sourceOperations.setStatementField(preparedStatement, 1, cursorFieldType, cursorInfo.getCursor()); + sourceOperations.setCursorField(preparedStatement, 1, cursorFieldType, cursorInfo.getCursor()); return preparedStatement; }, resultSet -> JdbcUtils.getDefaultSourceOperations().rowToJson(resultSet)); From 49b1447c6ed9a5f8c1b8d58632ad6cf77e39ed1a Mon Sep 17 00:00:00 2001 From: Akash Kulkarni Date: Thu, 5 Jan 2023 13:13:13 +0530 Subject: [PATCH 08/14] Addressing comments --- .../source/postgres/PostgresSource.java | 101 +++++++++--------- .../source/postgres/PostgresUtils.java | 1 - 2 files changed, 52 insertions(+), 50 deletions(-) diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java index d97a2dad8ad3..d2a3aab3609d 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java @@ -26,7 +26,6 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; -import io.airbyte.commons.exceptions.ConfigErrorException; import io.airbyte.commons.features.EnvVariableFeatureFlags; import io.airbyte.commons.features.FeatureFlags; import io.airbyte.commons.functional.CheckedConsumer; @@ -560,18 +559,21 @@ protected void estimateFullRefreshSyncSize(final JdbcDatabase database, final List tableEstimateResult = getFullTableEstimate(database, fullTableName); - final long syncRowCount = tableEstimateResult.get(0).get(ROW_COUNT_RESULT_COL).asLong(); - final long syncByteCount = tableEstimateResult.get(0).get(TOTAL_BYTES_RESULT_COL).asLong(); - - // Here, we double the bytes estimate to account for serialization. Perhaps a better way to do this - // is to - // read a row and Stringify it to better understand the accurate volume of data sent over the wire. - // However, this approach doesn't account for different row sizes. - AirbyteTraceMessageUtility.emitEstimateTrace(PLATFORM_DATA_INCREASE_FACTOR * syncByteCount, Type.STREAM, syncRowCount, tableName, schemaName); - LOGGER.info(String.format("Estimate for table: %s : {sync_row_count: %s, sync_bytes: %s, total_table_row_count: %s, total_table_bytes: %s}", - fullTableName, syncRowCount, syncByteCount, syncRowCount, syncByteCount)); + if (!tableEstimateResult.isEmpty() && tableEstimateResult.get(0).has(ROW_COUNT_RESULT_COL) && + tableEstimateResult.get(0).has(TOTAL_BYTES_RESULT_COL)) { + final long syncRowCount = tableEstimateResult.get(0).get(ROW_COUNT_RESULT_COL).asLong(); + final long syncByteCount = tableEstimateResult.get(0).get(TOTAL_BYTES_RESULT_COL).asLong(); + + // Here, we double the bytes estimate to account for serialization. Perhaps a better way to do this + // is to + // read a row and Stringify it to better understand the accurate volume of data sent over the wire. + // However, this approach doesn't account for different row sizes. + AirbyteTraceMessageUtility.emitEstimateTrace(PLATFORM_DATA_INCREASE_FACTOR * syncByteCount, Type.STREAM, syncRowCount, tableName, schemaName); + LOGGER.info(String.format("Estimate for table: %s : {sync_row_count: %s, sync_bytes: %s, total_table_row_count: %s, total_table_bytes: %s}", + fullTableName, syncRowCount, syncByteCount, syncRowCount, syncByteCount)); + } } catch (final SQLException e) { - throw new ConfigErrorException("Error occurred while attempting to estimate sync size", e); + LOGGER.warn("Error occurred while attempting to estimate sync size", e); } } @@ -595,7 +597,11 @@ protected void estimateIncrementalSyncSize(final JdbcDatabase database, final long syncByteCount; syncRowCount = getIncrementalTableRowCount(database, fullTableName, cursorInfo, cursorFieldType); - syncByteCount = (tableByteCount / tableRowCount) * syncRowCount; + if (tableRowCount == 0) { + syncByteCount = 0; + } else { + syncByteCount = (tableByteCount / tableRowCount) * syncRowCount; + } // Here, we double the bytes estimate to account for serialization. Perhaps a better way to do this // is to @@ -605,7 +611,7 @@ protected void estimateIncrementalSyncSize(final JdbcDatabase database, LOGGER.info(String.format("Estimate for table: %s : {sync_row_count: %s, sync_bytes: %s, total_table_row_count: %s, total_table_bytes: %s}", fullTableName, syncRowCount, syncByteCount, tableRowCount, tableRowCount)); } catch (final SQLException e) { - throw new ConfigErrorException("Error occurred while attempting to estimate sync size", e); + LOGGER.warn("Error occurred while attempting to estimate sync size", e); } } @@ -623,45 +629,42 @@ private List getFullTableEstimate(final JdbcDatabase database, final S private long getIncrementalTableRowCount(final JdbcDatabase database, final String fullTableName, final CursorInfo cursorInfo, - final PostgresType cursorFieldType) { - try { - final String quotedCursorField = getIdentifierWithQuoting(cursorInfo.getCursorField(), getQuoteString()); - - // Calculate actual number of rows to sync here. - final List result = database.queryJsons( - connection -> { - LOGGER.info("Preparing query for table: {}", fullTableName); - final String operator; - if (cursorInfo.getCursorRecordCount() <= 0L) { + final PostgresType cursorFieldType) + throws SQLException { + final String quotedCursorField = getIdentifierWithQuoting(cursorInfo.getCursorField(), getQuoteString()); + + // Calculate actual number of rows to sync here. + final List result = database.queryJsons( + connection -> { + LOGGER.info("Preparing query for table: {}", fullTableName); + final String operator; + if (cursorInfo.getCursorRecordCount() <= 0L) { + operator = ">"; + } else { + final long actualRecordCount = getActualCursorRecordCount( + connection, fullTableName, quotedCursorField, cursorFieldType, cursorInfo.getCursor()); + LOGGER.info("Table {} cursor count: expected {}, actual {}", fullTableName, cursorInfo.getCursorRecordCount(), actualRecordCount); + if (actualRecordCount == cursorInfo.getCursorRecordCount()) { operator = ">"; } else { - final long actualRecordCount = getActualCursorRecordCount( - connection, fullTableName, quotedCursorField, cursorFieldType, cursorInfo.getCursor()); - LOGGER.info("Table {} cursor count: expected {}, actual {}", fullTableName, cursorInfo.getCursorRecordCount(), actualRecordCount); - if (actualRecordCount == cursorInfo.getCursorRecordCount()) { - operator = ">"; - } else { - operator = ">="; - } + operator = ">="; } + } + + final StringBuilder sql = new StringBuilder(String.format("SELECT COUNT(*) FROM %s WHERE %s %s ?", + fullTableName, + quotedCursorField, + operator)); + + final PreparedStatement preparedStatement = connection.prepareStatement(sql.toString()); + LOGGER.info("Executing query for table {}: {}", fullTableName, preparedStatement); + sourceOperations.setCursorField(preparedStatement, 1, cursorFieldType, cursorInfo.getCursor()); + return preparedStatement; + }, + resultSet -> JdbcUtils.getDefaultSourceOperations().rowToJson(resultSet)); - final StringBuilder sql = new StringBuilder(String.format("SELECT COUNT(*) FROM %s WHERE %s %s ?", - fullTableName, - quotedCursorField, - operator)); - - final PreparedStatement preparedStatement = connection.prepareStatement(sql.toString()); - LOGGER.info("Executing query for table {}: {}", fullTableName, preparedStatement); - sourceOperations.setCursorField(preparedStatement, 1, cursorFieldType, cursorInfo.getCursor()); - return preparedStatement; - }, - resultSet -> JdbcUtils.getDefaultSourceOperations().rowToJson(resultSet)); - - Preconditions.checkState(result.size() == 1); - return result.get(0).get("count").asLong(); - } catch (final SQLException e) { - throw new ConfigErrorException("Error occurred while attempting to estimate sync size", e); - } + Preconditions.checkState(result.size() == 1); + return result.get(0).get("count").asLong(); } } diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresUtils.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresUtils.java index cc0c5e1b38b9..f7045bdd8c4e 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresUtils.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresUtils.java @@ -42,7 +42,6 @@ public class PostgresUtils { public static final Duration MAX_FIRST_RECORD_WAIT_TIME = Duration.ofMinutes(20); public static final Duration DEFAULT_FIRST_RECORD_WAIT_TIME = Duration.ofMinutes(5); - public static String getPluginValue(final JsonNode field) { return field.has("plugin") ? field.get("plugin").asText() : PGOUTPUT_PLUGIN; } From 4448d81b8adcb3daf7cf9709adf782dda2aa31bb Mon Sep 17 00:00:00 2001 From: Akash Kulkarni Date: Fri, 6 Jan 2023 13:49:10 +0530 Subject: [PATCH 09/14] Bump version + documentation --- airbyte-integrations/connectors/source-postgres/Dockerfile | 2 +- docs/integrations/sources/postgres.md | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-postgres/Dockerfile b/airbyte-integrations/connectors/source-postgres/Dockerfile index 793e460515a3..6edcec59beb5 100644 --- a/airbyte-integrations/connectors/source-postgres/Dockerfile +++ b/airbyte-integrations/connectors/source-postgres/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=1.0.35 +LABEL io.airbyte.version=1.0.36 LABEL io.airbyte.name=airbyte/source-postgres diff --git a/docs/integrations/sources/postgres.md b/docs/integrations/sources/postgres.md index acb541a3ccd2..3f43918dd6bc 100644 --- a/docs/integrations/sources/postgres.md +++ b/docs/integrations/sources/postgres.md @@ -411,6 +411,7 @@ The root causes is that the WALs needed for the incremental sync has been remove | Version | Date | Pull Request | Subject | |:--------|:-----------|:----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 1.0.36 | 2023-01-06 | [20783](https://github.com/airbytehq/airbyte/pull/20783) | Emit estimate trace messages for non-CDC mode. | | 1.0.35 | 2023-01-04 | [20469](https://github.com/airbytehq/airbyte/pull/20469) | Introduce feature to make LSN commit behaviour configurable. | | 1.0.34 | 2022-12-13 | [20378](https://github.com/airbytehq/airbyte/pull/20378) | Improve descriptions | | 1.0.33 | 2022-12-12 | [18959](https://github.com/airbytehq/airbyte/pull/18959) | CDC : Don't timeout if snapshot is not complete. | From b63079e279806d9183504eed9a8dd745a399d2e8 Mon Sep 17 00:00:00 2001 From: Akash Kulkarni Date: Fri, 6 Jan 2023 13:50:28 +0530 Subject: [PATCH 10/14] Update strict-encrypt Dockerfile --- .../connectors/source-postgres-strict-encrypt/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile b/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile index f7c9b147afed..0978075777e9 100644 --- a/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile +++ b/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres-strict-encrypt COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=1.0.35 +LABEL io.airbyte.version=1.0.36 LABEL io.airbyte.name=airbyte/source-postgres-strict-encrypt From a4f3d24b2a8467950ba704a80a310b93a40b8827 Mon Sep 17 00:00:00 2001 From: Akash Kulkarni Date: Fri, 6 Jan 2023 13:54:39 +0530 Subject: [PATCH 11/14] Unpublish --- .../connectors/source-postgres-strict-encrypt/Dockerfile | 2 +- airbyte-integrations/connectors/source-postgres/Dockerfile | 2 +- docs/integrations/sources/postgres.md | 1 - 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile b/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile index 0978075777e9..f7c9b147afed 100644 --- a/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile +++ b/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres-strict-encrypt COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=1.0.36 +LABEL io.airbyte.version=1.0.35 LABEL io.airbyte.name=airbyte/source-postgres-strict-encrypt diff --git a/airbyte-integrations/connectors/source-postgres/Dockerfile b/airbyte-integrations/connectors/source-postgres/Dockerfile index 6edcec59beb5..793e460515a3 100644 --- a/airbyte-integrations/connectors/source-postgres/Dockerfile +++ b/airbyte-integrations/connectors/source-postgres/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=1.0.36 +LABEL io.airbyte.version=1.0.35 LABEL io.airbyte.name=airbyte/source-postgres diff --git a/docs/integrations/sources/postgres.md b/docs/integrations/sources/postgres.md index 3f43918dd6bc..acb541a3ccd2 100644 --- a/docs/integrations/sources/postgres.md +++ b/docs/integrations/sources/postgres.md @@ -411,7 +411,6 @@ The root causes is that the WALs needed for the incremental sync has been remove | Version | Date | Pull Request | Subject | |:--------|:-----------|:----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| 1.0.36 | 2023-01-06 | [20783](https://github.com/airbytehq/airbyte/pull/20783) | Emit estimate trace messages for non-CDC mode. | | 1.0.35 | 2023-01-04 | [20469](https://github.com/airbytehq/airbyte/pull/20469) | Introduce feature to make LSN commit behaviour configurable. | | 1.0.34 | 2022-12-13 | [20378](https://github.com/airbytehq/airbyte/pull/20378) | Improve descriptions | | 1.0.33 | 2022-12-12 | [18959](https://github.com/airbytehq/airbyte/pull/18959) | CDC : Don't timeout if snapshot is not complete. | From 25e8061733024bfa3f374697d3202f7e8b686f78 Mon Sep 17 00:00:00 2001 From: Akash Kulkarni Date: Mon, 16 Jan 2023 11:32:56 -0800 Subject: [PATCH 12/14] Merge conflicts --- .../integrations/source/postgres/PostgresQueryUtils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresQueryUtils.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresQueryUtils.java index b7584605d63c..23140901e3a7 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresQueryUtils.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresQueryUtils.java @@ -14,7 +14,7 @@ public class PostgresQueryUtils { SELECT (EXISTS (SELECT FROM information_schema.columns WHERE table_schema = '%s' AND table_name = '%s' AND is_nullable = 'YES' AND column_name = '%s')) AND - (EXISTS (SELECT from %s.\"%s\" where \"%s\" IS NULL LIMIT 1)) AS %s + (EXISTS (SELECT from \"%s\".\"%s\" where \"%s\" IS NULL LIMIT 1)) AS %s """; public static final String NULL_CURSOR_VALUE_NO_SCHEMA_QUERY = """ From 2f08fda3c0315cd53d80906eb85f822f43ba0d4a Mon Sep 17 00:00:00 2001 From: Akash Kulkarni Date: Tue, 17 Jan 2023 13:43:28 -0800 Subject: [PATCH 13/14] Update Dockerfile --- .../connectors/source-postgres-strict-encrypt/Dockerfile | 2 +- airbyte-integrations/connectors/source-postgres/Dockerfile | 2 +- docs/integrations/sources/postgres.md | 1 + 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile b/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile index 0978075777e9..df697ded6fa6 100644 --- a/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile +++ b/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres-strict-encrypt COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=1.0.36 +LABEL io.airbyte.version=1.0.37 LABEL io.airbyte.name=airbyte/source-postgres-strict-encrypt diff --git a/airbyte-integrations/connectors/source-postgres/Dockerfile b/airbyte-integrations/connectors/source-postgres/Dockerfile index 6edcec59beb5..291e5c2c7763 100644 --- a/airbyte-integrations/connectors/source-postgres/Dockerfile +++ b/airbyte-integrations/connectors/source-postgres/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=1.0.36 +LABEL io.airbyte.version=1.0.37 LABEL io.airbyte.name=airbyte/source-postgres diff --git a/docs/integrations/sources/postgres.md b/docs/integrations/sources/postgres.md index 7e401765c920..fb08e2c5bb24 100644 --- a/docs/integrations/sources/postgres.md +++ b/docs/integrations/sources/postgres.md @@ -411,6 +411,7 @@ The root causes is that the WALs needed for the incremental sync has been remove | Version | Date | Pull Request | Subject | |:--------|:-----------|:----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 1.0.37 | 2023-01-17 | [20783](https://github.com/airbytehq/airbyte/pull/20783) | Emit estimate trace messages for non-CDC mode. | | 1.0.36 | 2023-01-11 | [21003](https://github.com/airbytehq/airbyte/pull/21003) | Handle null values for array data types in CDC mode gracefully. | | 1.0.35 | 2023-01-04 | [20469](https://github.com/airbytehq/airbyte/pull/20469) | Introduce feature to make LSN commit behaviour configurable. | | 1.0.34 | 2022-12-13 | [20378](https://github.com/airbytehq/airbyte/pull/20378) | Improve descriptions | From 40c2eb88a8c359fa92bee6ca2d9685b72de17388 Mon Sep 17 00:00:00 2001 From: Octavia Squidington III Date: Tue, 17 Jan 2023 23:11:27 +0000 Subject: [PATCH 14/14] auto-bump connector version --- .../init/src/main/resources/seed/source_definitions.yaml | 2 +- airbyte-config/init/src/main/resources/seed/source_specs.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 14e04f92aa75..8aa92e0e15bb 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -1338,7 +1338,7 @@ - name: Postgres sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750 dockerRepository: airbyte/source-postgres - dockerImageTag: 1.0.36 + dockerImageTag: 1.0.37 documentationUrl: https://docs.airbyte.com/integrations/sources/postgres icon: postgresql.svg sourceType: database diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index ecfbf1ca4330..a66aaf5307db 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -11395,7 +11395,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-postgres:1.0.36" +- dockerImage: "airbyte/source-postgres:1.0.37" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres" connectionSpecification: