Skip to content

Commit

Permalink
Source Postgres : Fast query for estimate messages (#21683)
Browse files Browse the repository at this point in the history
* Source Postgres : Fast query for estimate messages

* Update documentation

* auto-bump connector version

* Update strict-encrypt Dockerfile

Co-authored-by: Octavia Squidington III <[email protected]>
  • Loading branch information
akashkulk and octavia-squidington-iii authored Jan 23, 2023
1 parent 0fbd3b2 commit 79de89d
Show file tree
Hide file tree
Showing 7 changed files with 149 additions and 134 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1338,7 +1338,7 @@
- name: Postgres
sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerRepository: airbyte/source-postgres
dockerImageTag: 1.0.38
dockerImageTag: 1.0.39
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
icon: postgresql.svg
sourceType: database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11488,7 +11488,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-postgres:1.0.38"
- dockerImage: "airbyte/source-postgres:1.0.39"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.38
LABEL io.airbyte.version=1.0.39
LABEL io.airbyte.name=airbyte/source-postgres-strict-encrypt
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-postgres/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.38
LABEL io.airbyte.version=1.0.39
LABEL io.airbyte.name=airbyte/source-postgres
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class PostgresQueryUtils {

public static final String TABLE_ESTIMATE_QUERY =
"""
SELECT (SELECT COUNT(*) FROM %s) AS %s,
SELECT (select reltuples::int8 as count from pg_class c JOIN pg_catalog.pg_namespace n ON n.oid=c.relnamespace where nspname='%s' AND relname='%s') AS %s,
pg_relation_size('%s') AS %s;
""";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -557,13 +557,20 @@ protected void estimateFullRefreshSyncSize(final JdbcDatabase database,
final String fullTableName =
getFullyQualifiedTableNameWithQuoting(schemaName, tableName, getQuoteString());

final List<JsonNode> tableEstimateResult = getFullTableEstimate(database, fullTableName);
final List<JsonNode> tableEstimateResult = getFullTableEstimate(database, fullTableName, schemaName, tableName);

if (!tableEstimateResult.isEmpty() && tableEstimateResult.get(0).has(ROW_COUNT_RESULT_COL) &&
tableEstimateResult.get(0).has(TOTAL_BYTES_RESULT_COL)) {
final long syncRowCount = tableEstimateResult.get(0).get(ROW_COUNT_RESULT_COL).asLong();
final long syncByteCount = tableEstimateResult.get(0).get(TOTAL_BYTES_RESULT_COL).asLong();

// The fast count query can return negative or otherwise invalid results for small tables. In this
// case, we can skip emitting an
// estimate trace altogether since the sync will likely complete quickly.
if (syncRowCount <= 0) {
return;
}

// Here, we double the bytes estimate to account for serialization. Perhaps a better way to do this
// is to
// read a row and Stringify it to better understand the accurate volume of data sent over the wire.
Expand All @@ -588,20 +595,23 @@ protected void estimateIncrementalSyncSize(final JdbcDatabase database,
final String fullTableName =
getFullyQualifiedTableNameWithQuoting(schemaName, tableName, getQuoteString());

final List<JsonNode> tableEstimateResult = getFullTableEstimate(database, fullTableName);
final List<JsonNode> tableEstimateResult = getFullTableEstimate(database, fullTableName, schemaName, tableName);

final long tableRowCount = tableEstimateResult.get(0).get(ROW_COUNT_RESULT_COL).asLong();
final long tableByteCount = tableEstimateResult.get(0).get(TOTAL_BYTES_RESULT_COL).asLong();

// The fast count query can return negative or otherwise invalid results for small tables. In this
// case, we can skip emitting an
// estimate trace altogether since the sync will likely complete quickly.
if (tableRowCount <= 0) {
return;
}

final long syncRowCount;
final long syncByteCount;

syncRowCount = getIncrementalTableRowCount(database, fullTableName, cursorInfo, cursorFieldType);
if (tableRowCount == 0) {
syncByteCount = 0;
} else {
syncByteCount = (tableByteCount / tableRowCount) * syncRowCount;
}
syncByteCount = (tableByteCount / tableRowCount) * syncRowCount;

// Here, we double the bytes estimate to account for serialization. Perhaps a better way to do this
// is to
Expand All @@ -615,10 +625,14 @@ protected void estimateIncrementalSyncSize(final JdbcDatabase database,
}
}

private List<JsonNode> getFullTableEstimate(final JdbcDatabase database, final String fullTableName) throws SQLException {
private List<JsonNode> getFullTableEstimate(final JdbcDatabase database,
final String fullTableName,
final String schemaName,
final String tableName)
throws SQLException {
// Construct the table estimate query.
final String tableEstimateQuery =
String.format(TABLE_ESTIMATE_QUERY, fullTableName, ROW_COUNT_RESULT_COL, fullTableName, TOTAL_BYTES_RESULT_COL);
String.format(TABLE_ESTIMATE_QUERY, schemaName, tableName, ROW_COUNT_RESULT_COL, fullTableName, TOTAL_BYTES_RESULT_COL);
LOGGER.debug("table estimate query: {}", tableEstimateQuery);
final List<JsonNode> jsonNodes = database.bufferedResultSetQuery(conn -> conn.createStatement().executeQuery(tableEstimateQuery),
resultSet -> JdbcUtils.getDefaultSourceOperations().rowToJson(resultSet));
Expand Down
Loading

0 comments on commit 79de89d

Please sign in to comment.