Skip to content

Commit

Permalink
Source Postgres : Emit estimate trace messages for non-CDC mode (#20783)
Browse files Browse the repository at this point in the history
* Emit estimate trace messages

* Update PostgresQueryUtils.java

* Remaining merge conflicts

* Code cleanip

* Address comments

* Formatting

* Cleanup

* Addressing comments

* Bump version + documentation

* Update strict-encrypt Dockerfile

* Unpublish

* Merge conflicts

* Update Dockerfile

* auto-bump connector version

Co-authored-by: Octavia Squidington III <[email protected]>
  • Loading branch information
akashkulk and octavia-squidington-iii authored Jan 18, 2023
1 parent 5f29824 commit 63e4482
Show file tree
Hide file tree
Showing 12 changed files with 288 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1338,7 +1338,7 @@
- name: Postgres
sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerRepository: airbyte/source-postgres
dockerImageTag: 1.0.36
dockerImageTag: 1.0.37
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
icon: postgresql.svg
sourceType: database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11395,7 +11395,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-postgres:1.0.36"
- dockerImage: "airbyte/source-postgres:1.0.37"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@ public class JdbcUtils {
public static final String MODE_KEY = "mode";
public static final String AMPERSAND = "&";
public static final String EQUALS = "=";

// An estimate for how much additional data in sent over the wire due to conversion of source data
// into {@link AirbyteMessage}. This is due to
// the fact that records are in JSON format and all database fields are converted to Strings.
// Currently, this is used in the logic for emitting
// estimate trace messages.
public static final int PLATFORM_DATA_INCREASE_FACTOR = 2;
public static final Set<JDBCType> ALLOWED_CURSOR_TYPES = Set.of(TIMESTAMP, TIME, DATE, TINYINT, SMALLINT, INTEGER,
BIGINT, FLOAT, DOUBLE, REAL, NUMERIC, DECIMAL, NVARCHAR, VARCHAR, LONGVARCHAR);
private static final JdbcSourceOperations defaultSourceOperations = new JdbcSourceOperations();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import io.airbyte.protocol.models.v0.AirbyteErrorTraceMessage;
import io.airbyte.protocol.models.v0.AirbyteErrorTraceMessage.FailureType;
import io.airbyte.protocol.models.v0.AirbyteEstimateTraceMessage;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteMessage.Type;
import io.airbyte.protocol.models.v0.AirbyteTraceMessage;
Expand All @@ -24,6 +25,21 @@ public static void emitConfigErrorTrace(final Throwable e, final String displayM
emitErrorTrace(e, displayMessage, FailureType.CONFIG_ERROR);
}

public static void emitEstimateTrace(final long byteEstimate,
final AirbyteEstimateTraceMessage.Type type,
final long rowEstimate,
final String streamName,
final String streamNamespace) {
emitMessage(makeAirbyteMessageFromTraceMessage(
makeAirbyteTraceMessage(AirbyteTraceMessage.Type.ESTIMATE)
.withEstimate(new AirbyteEstimateTraceMessage()
.withByteEstimate(byteEstimate)
.withType(type)
.withRowEstimate(rowEstimate)
.withName(streamName)
.withNamespace(streamNamespace))));
}

public static void emitErrorTrace(final Throwable e, final String displayMessage, final FailureType failureType) {
emitMessage(makeErrorTraceAirbyteMessage(e, displayMessage, failureType));
}
Expand All @@ -35,10 +51,10 @@ public static void emitErrorTrace(final Throwable e, final String displayMessage
// public void emitNotificationTrace() {}
// public void emitMetricTrace() {}

private static void emitMessage(AirbyteMessage message) {
private static void emitMessage(final AirbyteMessage message) {
// Not sure why defaultOutputRecordCollector is under Destination specifically,
// but this matches usage elsewhere in base-java
Consumer<AirbyteMessage> outputRecordCollector = Destination::defaultOutputRecordCollector;
final Consumer<AirbyteMessage> outputRecordCollector = Destination::defaultOutputRecordCollector;
outputRecordCollector.accept(message);
}

Expand All @@ -56,7 +72,7 @@ private static AirbyteMessage makeErrorTraceAirbyteMessage(
.withStackTrace(ExceptionUtils.getStackTrace(e))));
}

private static AirbyteMessage makeAirbyteMessageFromTraceMessage(AirbyteTraceMessage airbyteTraceMessage) {
private static AirbyteMessage makeAirbyteMessageFromTraceMessage(final AirbyteTraceMessage airbyteTraceMessage) {
return new AirbyteMessage().withType(Type.TRACE).withTrace(airbyteTraceMessage);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,11 +401,11 @@ protected String getCountColumnName() {
return "record_count";
}

private long getActualCursorRecordCount(final Connection connection,
final String fullTableName,
final String quotedCursorField,
final Datatype cursorFieldType,
final String cursor)
protected long getActualCursorRecordCount(final Connection connection,
final String fullTableName,
final String quotedCursorField,
final Datatype cursorFieldType,
final String cursor)
throws SQLException {
final String columnName = getCountColumnName();
final PreparedStatement cursorRecordStatement;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.36
LABEL io.airbyte.version=1.0.37
LABEL io.airbyte.name=airbyte/source-postgres-strict-encrypt
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-postgres/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.36
LABEL io.airbyte.version=1.0.37
LABEL io.airbyte.name=airbyte/source-postgres
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.postgres;

/**
* Utility class to define constants related to querying postgres
*/
public class PostgresQueryUtils {

public static final String NULL_CURSOR_VALUE_WITH_SCHEMA_QUERY =
"""
SELECT
(EXISTS (SELECT FROM information_schema.columns WHERE table_schema = '%s' AND table_name = '%s' AND is_nullable = 'YES' AND column_name = '%s'))
AND
(EXISTS (SELECT from \"%s\".\"%s\" where \"%s\" IS NULL LIMIT 1)) AS %s
""";
public static final String NULL_CURSOR_VALUE_NO_SCHEMA_QUERY =
"""
SELECT
(EXISTS (SELECT FROM information_schema.columns WHERE table_name = '%s' AND is_nullable = 'YES' AND column_name = '%s'))
AND
(EXISTS (SELECT from \"%s\" where \"%s\" IS NULL LIMIT 1)) AS %s
""";

public static final String TABLE_ESTIMATE_QUERY =
"""
SELECT (SELECT COUNT(*) FROM %s) AS %s,
pg_relation_size('%s') AS %s;
""";

public static final String ROW_COUNT_RESULT_COL = "rowcount";

public static final String TOTAL_BYTES_RESULT_COL = "totalbytes";

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,16 @@

import static io.airbyte.db.jdbc.JdbcUtils.AMPERSAND;
import static io.airbyte.db.jdbc.JdbcUtils.EQUALS;
import static io.airbyte.db.jdbc.JdbcUtils.PLATFORM_DATA_INCREASE_FACTOR;
import static io.airbyte.integrations.debezium.AirbyteDebeziumHandler.shouldUseCDC;
import static io.airbyte.integrations.source.jdbc.JdbcSSLConnectionUtils.PARAM_CA_CERTIFICATE;
import static io.airbyte.integrations.source.postgres.PostgresQueryUtils.NULL_CURSOR_VALUE_NO_SCHEMA_QUERY;
import static io.airbyte.integrations.source.postgres.PostgresQueryUtils.NULL_CURSOR_VALUE_WITH_SCHEMA_QUERY;
import static io.airbyte.integrations.source.postgres.PostgresQueryUtils.ROW_COUNT_RESULT_COL;
import static io.airbyte.integrations.source.postgres.PostgresQueryUtils.TABLE_ESTIMATE_QUERY;
import static io.airbyte.integrations.source.postgres.PostgresQueryUtils.TOTAL_BYTES_RESULT_COL;
import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.getFullyQualifiedTableNameWithQuoting;
import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.getIdentifierWithQuoting;
import static io.airbyte.integrations.util.PostgresSslConnectionUtils.DISABLE;
import static io.airbyte.integrations.util.PostgresSslConnectionUtils.PARAM_SSL_MODE;
import static java.util.stream.Collectors.toList;
Expand All @@ -29,6 +37,7 @@
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.db.jdbc.streaming.AdaptiveStreamingQueryConfig;
import io.airbyte.integrations.base.AirbyteTraceMessageUtility;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.base.ssh.SshWrappedSource;
Expand All @@ -37,6 +46,7 @@
import io.airbyte.integrations.source.jdbc.AbstractJdbcSource;
import io.airbyte.integrations.source.jdbc.JdbcSSLConnectionUtils;
import io.airbyte.integrations.source.jdbc.dto.JdbcPrivilegeDto;
import io.airbyte.integrations.source.relationaldb.CursorInfo;
import io.airbyte.integrations.source.relationaldb.TableInfo;
import io.airbyte.integrations.source.relationaldb.models.CdcState;
import io.airbyte.integrations.source.relationaldb.models.DbState;
Expand All @@ -46,6 +56,7 @@
import io.airbyte.protocol.models.v0.AirbyteCatalog;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.v0.AirbyteEstimateTraceMessage.Type;
import io.airbyte.protocol.models.v0.AirbyteGlobalState;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteStateMessage;
Expand Down Expand Up @@ -93,20 +104,7 @@ public class PostgresSource extends AbstractJdbcSource<PostgresType> implements
public static final String SSL_KEY = "sslkey";
public static final String SSL_PASSWORD = "sslpassword";
public static final String MODE = "mode";
public static final String NULL_CURSOR_VALUE_WITH_SCHEMA =
"""
SELECT
(EXISTS (SELECT FROM information_schema.columns WHERE table_schema = '%s' AND table_name = '%s' AND is_nullable = 'YES' AND column_name = '%s'))
AND
(EXISTS (SELECT from \"%s\".\"%s\" where \"%s\" IS NULL LIMIT 1)) AS %s
""";
public static final String NULL_CURSOR_VALUE_NO_SCHEMA =
"""
SELECT
(EXISTS (SELECT FROM information_schema.columns WHERE table_name = '%s' AND is_nullable = 'YES' AND column_name = '%s'))
AND
(EXISTS (SELECT from \"%s\" where \"%s\" IS NULL LIMIT 1)) AS %s
""";

private List<String> schemas;
private final FeatureFlags featureFlags;
private static final Set<String> INVALID_CDC_SSL_MODES = ImmutableSet.of("allow", "prefer");
Expand Down Expand Up @@ -535,10 +533,10 @@ protected boolean verifyCursorColumnValues(final JdbcDatabase database, final St
final String resultColName = "nullValue";
// Query: Only if cursor column allows null values, query whether it contains one
if (StringUtils.isNotBlank(schema)) {
query = String.format(NULL_CURSOR_VALUE_WITH_SCHEMA,
query = String.format(NULL_CURSOR_VALUE_WITH_SCHEMA_QUERY,
schema, tableName, columnName, schema, tableName, columnName, resultColName);
} else {
query = String.format(NULL_CURSOR_VALUE_NO_SCHEMA,
query = String.format(NULL_CURSOR_VALUE_NO_SCHEMA_QUERY,
tableName, columnName, tableName, columnName, resultColName);
}
LOGGER.debug("null value query: {}", query);
Expand All @@ -550,4 +548,123 @@ protected boolean verifyCursorColumnValues(final JdbcDatabase database, final St
return !nullValExist;
}

@Override
protected void estimateFullRefreshSyncSize(final JdbcDatabase database,
final ConfiguredAirbyteStream configuredAirbyteStream) {
try {
final String schemaName = configuredAirbyteStream.getStream().getNamespace();
final String tableName = configuredAirbyteStream.getStream().getName();
final String fullTableName =
getFullyQualifiedTableNameWithQuoting(schemaName, tableName, getQuoteString());

final List<JsonNode> tableEstimateResult = getFullTableEstimate(database, fullTableName);

if (!tableEstimateResult.isEmpty() && tableEstimateResult.get(0).has(ROW_COUNT_RESULT_COL) &&
tableEstimateResult.get(0).has(TOTAL_BYTES_RESULT_COL)) {
final long syncRowCount = tableEstimateResult.get(0).get(ROW_COUNT_RESULT_COL).asLong();
final long syncByteCount = tableEstimateResult.get(0).get(TOTAL_BYTES_RESULT_COL).asLong();

// Here, we double the bytes estimate to account for serialization. Perhaps a better way to do this
// is to
// read a row and Stringify it to better understand the accurate volume of data sent over the wire.
// However, this approach doesn't account for different row sizes.
AirbyteTraceMessageUtility.emitEstimateTrace(PLATFORM_DATA_INCREASE_FACTOR * syncByteCount, Type.STREAM, syncRowCount, tableName, schemaName);
LOGGER.info(String.format("Estimate for table: %s : {sync_row_count: %s, sync_bytes: %s, total_table_row_count: %s, total_table_bytes: %s}",
fullTableName, syncRowCount, syncByteCount, syncRowCount, syncByteCount));
}
} catch (final SQLException e) {
LOGGER.warn("Error occurred while attempting to estimate sync size", e);
}
}

@Override
protected void estimateIncrementalSyncSize(final JdbcDatabase database,
final ConfiguredAirbyteStream configuredAirbyteStream,
final CursorInfo cursorInfo,
final PostgresType cursorFieldType) {
try {
final String schemaName = configuredAirbyteStream.getStream().getNamespace();
final String tableName = configuredAirbyteStream.getStream().getName();
final String fullTableName =
getFullyQualifiedTableNameWithQuoting(schemaName, tableName, getQuoteString());

final List<JsonNode> tableEstimateResult = getFullTableEstimate(database, fullTableName);

final long tableRowCount = tableEstimateResult.get(0).get(ROW_COUNT_RESULT_COL).asLong();
final long tableByteCount = tableEstimateResult.get(0).get(TOTAL_BYTES_RESULT_COL).asLong();

final long syncRowCount;
final long syncByteCount;

syncRowCount = getIncrementalTableRowCount(database, fullTableName, cursorInfo, cursorFieldType);
if (tableRowCount == 0) {
syncByteCount = 0;
} else {
syncByteCount = (tableByteCount / tableRowCount) * syncRowCount;
}

// Here, we double the bytes estimate to account for serialization. Perhaps a better way to do this
// is to
// read a row and Stringify it to better understand the accurate volume of data sent over the wire.
// However, this approach doesn't account for different row sizes
AirbyteTraceMessageUtility.emitEstimateTrace(PLATFORM_DATA_INCREASE_FACTOR * syncByteCount, Type.STREAM, syncRowCount, tableName, schemaName);
LOGGER.info(String.format("Estimate for table: %s : {sync_row_count: %s, sync_bytes: %s, total_table_row_count: %s, total_table_bytes: %s}",
fullTableName, syncRowCount, syncByteCount, tableRowCount, tableRowCount));
} catch (final SQLException e) {
LOGGER.warn("Error occurred while attempting to estimate sync size", e);
}
}

private List<JsonNode> getFullTableEstimate(final JdbcDatabase database, final String fullTableName) throws SQLException {
// Construct the table estimate query.
final String tableEstimateQuery =
String.format(TABLE_ESTIMATE_QUERY, fullTableName, ROW_COUNT_RESULT_COL, fullTableName, TOTAL_BYTES_RESULT_COL);
LOGGER.debug("table estimate query: {}", tableEstimateQuery);
final List<JsonNode> jsonNodes = database.bufferedResultSetQuery(conn -> conn.createStatement().executeQuery(tableEstimateQuery),
resultSet -> JdbcUtils.getDefaultSourceOperations().rowToJson(resultSet));
Preconditions.checkState(jsonNodes.size() == 1);
return jsonNodes;
}

private long getIncrementalTableRowCount(final JdbcDatabase database,
final String fullTableName,
final CursorInfo cursorInfo,
final PostgresType cursorFieldType)
throws SQLException {
final String quotedCursorField = getIdentifierWithQuoting(cursorInfo.getCursorField(), getQuoteString());

// Calculate actual number of rows to sync here.
final List<JsonNode> result = database.queryJsons(
connection -> {
LOGGER.info("Preparing query for table: {}", fullTableName);
final String operator;
if (cursorInfo.getCursorRecordCount() <= 0L) {
operator = ">";
} else {
final long actualRecordCount = getActualCursorRecordCount(
connection, fullTableName, quotedCursorField, cursorFieldType, cursorInfo.getCursor());
LOGGER.info("Table {} cursor count: expected {}, actual {}", fullTableName, cursorInfo.getCursorRecordCount(), actualRecordCount);
if (actualRecordCount == cursorInfo.getCursorRecordCount()) {
operator = ">";
} else {
operator = ">=";
}
}

final StringBuilder sql = new StringBuilder(String.format("SELECT COUNT(*) FROM %s WHERE %s %s ?",
fullTableName,
quotedCursorField,
operator));

final PreparedStatement preparedStatement = connection.prepareStatement(sql.toString());
LOGGER.info("Executing query for table {}: {}", fullTableName, preparedStatement);
sourceOperations.setCursorField(preparedStatement, 1, cursorFieldType, cursorInfo.getCursor());
return preparedStatement;
},
resultSet -> JdbcUtils.getDefaultSourceOperations().rowToJson(resultSet));

Preconditions.checkState(result.size() == 1);
return result.get(0).get("count").asLong();
}

}
Loading

0 comments on commit 63e4482

Please sign in to comment.