Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source Postgres : Emit estimate trace messages for non-CDC mode #20783

Merged
merged 28 commits into from
Jan 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
aa40c0c
Emit estimate trace messages
akashkulk Dec 21, 2022
0b72bde
Merge branch 'master' into message_bar
akashkulk Dec 21, 2022
98e4c16
Update PostgresQueryUtils.java
akashkulk Dec 21, 2022
bac0933
Remaining merge conflicts
akashkulk Dec 22, 2022
838bb69
Code cleanip
akashkulk Dec 22, 2022
e6fce71
Address comments
akashkulk Dec 29, 2022
c1555d4
Formatting
akashkulk Dec 29, 2022
7303650
Merge branch 'master' into message_bar
akashkulk Jan 3, 2023
c686aee
Merge branch 'master' into message_bar
akashkulk Jan 3, 2023
9e443d2
Cleanup
akashkulk Jan 4, 2023
9774447
Merge branch 'master' into message_bar
akashkulk Jan 5, 2023
49b1447
Addressing comments
akashkulk Jan 5, 2023
d47af6a
Merge branch 'master' into message_bar
akashkulk Jan 6, 2023
4448d81
Bump version + documentation
akashkulk Jan 6, 2023
b63079e
Update strict-encrypt Dockerfile
akashkulk Jan 6, 2023
a4f3d24
Unpublish
akashkulk Jan 6, 2023
c0c6061
Merge branch 'master' into message_bar
akashkulk Jan 6, 2023
d0b6352
Merge branch 'master' into message_bar
akashkulk Jan 16, 2023
25e8061
Merge conflicts
akashkulk Jan 16, 2023
85808ea
Merge branch 'master' into message_bar
akashkulk Jan 17, 2023
9e427c0
Merge branch 'master' into message_bar
akashkulk Jan 17, 2023
2f08fda
Update Dockerfile
akashkulk Jan 17, 2023
c2b4557
Merge branch 'master' into message_bar
akashkulk Jan 17, 2023
40c2eb8
auto-bump connector version
octavia-squidington-iii Jan 17, 2023
8dad719
Merge branch 'master' into message_bar
akashkulk Jan 17, 2023
48f1818
Merge branch 'master' into message_bar
akashkulk Jan 18, 2023
bc311af
Merge branch 'master' into message_bar
akashkulk Jan 18, 2023
9be7285
Merge branch 'master' into message_bar
akashkulk Jan 18, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1338,7 +1338,7 @@
- name: Postgres
sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerRepository: airbyte/source-postgres
dockerImageTag: 1.0.36
dockerImageTag: 1.0.37
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
icon: postgresql.svg
sourceType: database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11395,7 +11395,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-postgres:1.0.36"
- dockerImage: "airbyte/source-postgres:1.0.37"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@ public class JdbcUtils {
public static final String MODE_KEY = "mode";
public static final String AMPERSAND = "&";
public static final String EQUALS = "=";

// An estimate for how much additional data in sent over the wire due to conversion of source data
// into {@link AirbyteMessage}. This is due to
// the fact that records are in JSON format and all database fields are converted to Strings.
// Currently, this is used in the logic for emitting
// estimate trace messages.
public static final int PLATFORM_DATA_INCREASE_FACTOR = 2;
public static final Set<JDBCType> ALLOWED_CURSOR_TYPES = Set.of(TIMESTAMP, TIME, DATE, TINYINT, SMALLINT, INTEGER,
BIGINT, FLOAT, DOUBLE, REAL, NUMERIC, DECIMAL, NVARCHAR, VARCHAR, LONGVARCHAR);
private static final JdbcSourceOperations defaultSourceOperations = new JdbcSourceOperations();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import io.airbyte.protocol.models.v0.AirbyteErrorTraceMessage;
import io.airbyte.protocol.models.v0.AirbyteErrorTraceMessage.FailureType;
import io.airbyte.protocol.models.v0.AirbyteEstimateTraceMessage;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteMessage.Type;
import io.airbyte.protocol.models.v0.AirbyteTraceMessage;
Expand All @@ -24,6 +25,21 @@ public static void emitConfigErrorTrace(final Throwable e, final String displayM
emitErrorTrace(e, displayMessage, FailureType.CONFIG_ERROR);
}

public static void emitEstimateTrace(final long byteEstimate,
final AirbyteEstimateTraceMessage.Type type,
final long rowEstimate,
final String streamName,
final String streamNamespace) {
emitMessage(makeAirbyteMessageFromTraceMessage(
makeAirbyteTraceMessage(AirbyteTraceMessage.Type.ESTIMATE)
.withEstimate(new AirbyteEstimateTraceMessage()
.withByteEstimate(byteEstimate)
.withType(type)
.withRowEstimate(rowEstimate)
.withName(streamName)
.withNamespace(streamNamespace))));
}

public static void emitErrorTrace(final Throwable e, final String displayMessage, final FailureType failureType) {
emitMessage(makeErrorTraceAirbyteMessage(e, displayMessage, failureType));
}
Expand All @@ -35,10 +51,10 @@ public static void emitErrorTrace(final Throwable e, final String displayMessage
// public void emitNotificationTrace() {}
// public void emitMetricTrace() {}

private static void emitMessage(AirbyteMessage message) {
private static void emitMessage(final AirbyteMessage message) {
// Not sure why defaultOutputRecordCollector is under Destination specifically,
// but this matches usage elsewhere in base-java
Consumer<AirbyteMessage> outputRecordCollector = Destination::defaultOutputRecordCollector;
final Consumer<AirbyteMessage> outputRecordCollector = Destination::defaultOutputRecordCollector;
outputRecordCollector.accept(message);
}

Expand All @@ -56,7 +72,7 @@ private static AirbyteMessage makeErrorTraceAirbyteMessage(
.withStackTrace(ExceptionUtils.getStackTrace(e))));
}

private static AirbyteMessage makeAirbyteMessageFromTraceMessage(AirbyteTraceMessage airbyteTraceMessage) {
private static AirbyteMessage makeAirbyteMessageFromTraceMessage(final AirbyteTraceMessage airbyteTraceMessage) {
return new AirbyteMessage().withType(Type.TRACE).withTrace(airbyteTraceMessage);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,11 +401,11 @@ protected String getCountColumnName() {
return "record_count";
}

private long getActualCursorRecordCount(final Connection connection,
final String fullTableName,
final String quotedCursorField,
final Datatype cursorFieldType,
final String cursor)
protected long getActualCursorRecordCount(final Connection connection,
final String fullTableName,
final String quotedCursorField,
final Datatype cursorFieldType,
final String cursor)
throws SQLException {
final String columnName = getCountColumnName();
final PreparedStatement cursorRecordStatement;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.36
LABEL io.airbyte.version=1.0.37
LABEL io.airbyte.name=airbyte/source-postgres-strict-encrypt
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.36
LABEL io.airbyte.version=1.0.37
LABEL io.airbyte.name=airbyte/source-postgres
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.postgres;

/**
* Utility class to define constants related to querying postgres
*/
public class PostgresQueryUtils {

public static final String NULL_CURSOR_VALUE_WITH_SCHEMA_QUERY =
"""
SELECT
(EXISTS (SELECT FROM information_schema.columns WHERE table_schema = '%s' AND table_name = '%s' AND is_nullable = 'YES' AND column_name = '%s'))
AND
(EXISTS (SELECT from \"%s\".\"%s\" where \"%s\" IS NULL LIMIT 1)) AS %s
""";
public static final String NULL_CURSOR_VALUE_NO_SCHEMA_QUERY =
"""
SELECT
(EXISTS (SELECT FROM information_schema.columns WHERE table_name = '%s' AND is_nullable = 'YES' AND column_name = '%s'))
AND
(EXISTS (SELECT from \"%s\" where \"%s\" IS NULL LIMIT 1)) AS %s
""";

public static final String TABLE_ESTIMATE_QUERY =
"""
SELECT (SELECT COUNT(*) FROM %s) AS %s,
pg_relation_size('%s') AS %s;
Comment on lines +29 to +30
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR description says the intention is to only estimate table sizes using pg_relation_size, but won't this query end up doing a full count also? I believe this is what's causing my sync to hang for minutes (and maybe more) as attempting a count on my 186M row table takes quite a while.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the number of rows is needed, I'd recommend using an estimate via SELECT reltuples AS estimate FROM pg_class WHERE relname = 'table_name'. [1]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aha, you know about this as part of #21499.

""";

public static final String ROW_COUNT_RESULT_COL = "rowcount";

public static final String TOTAL_BYTES_RESULT_COL = "totalbytes";

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,16 @@

import static io.airbyte.db.jdbc.JdbcUtils.AMPERSAND;
import static io.airbyte.db.jdbc.JdbcUtils.EQUALS;
import static io.airbyte.db.jdbc.JdbcUtils.PLATFORM_DATA_INCREASE_FACTOR;
import static io.airbyte.integrations.debezium.AirbyteDebeziumHandler.shouldUseCDC;
import static io.airbyte.integrations.source.jdbc.JdbcSSLConnectionUtils.PARAM_CA_CERTIFICATE;
import static io.airbyte.integrations.source.postgres.PostgresQueryUtils.NULL_CURSOR_VALUE_NO_SCHEMA_QUERY;
import static io.airbyte.integrations.source.postgres.PostgresQueryUtils.NULL_CURSOR_VALUE_WITH_SCHEMA_QUERY;
import static io.airbyte.integrations.source.postgres.PostgresQueryUtils.ROW_COUNT_RESULT_COL;
import static io.airbyte.integrations.source.postgres.PostgresQueryUtils.TABLE_ESTIMATE_QUERY;
import static io.airbyte.integrations.source.postgres.PostgresQueryUtils.TOTAL_BYTES_RESULT_COL;
import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.getFullyQualifiedTableNameWithQuoting;
import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.getIdentifierWithQuoting;
import static io.airbyte.integrations.util.PostgresSslConnectionUtils.DISABLE;
import static io.airbyte.integrations.util.PostgresSslConnectionUtils.PARAM_SSL_MODE;
import static java.util.stream.Collectors.toList;
Expand All @@ -29,6 +37,7 @@
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.db.jdbc.streaming.AdaptiveStreamingQueryConfig;
import io.airbyte.integrations.base.AirbyteTraceMessageUtility;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.base.ssh.SshWrappedSource;
Expand All @@ -37,6 +46,7 @@
import io.airbyte.integrations.source.jdbc.AbstractJdbcSource;
import io.airbyte.integrations.source.jdbc.JdbcSSLConnectionUtils;
import io.airbyte.integrations.source.jdbc.dto.JdbcPrivilegeDto;
import io.airbyte.integrations.source.relationaldb.CursorInfo;
import io.airbyte.integrations.source.relationaldb.TableInfo;
import io.airbyte.integrations.source.relationaldb.models.CdcState;
import io.airbyte.integrations.source.relationaldb.models.DbState;
Expand All @@ -46,6 +56,7 @@
import io.airbyte.protocol.models.v0.AirbyteCatalog;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.v0.AirbyteEstimateTraceMessage.Type;
import io.airbyte.protocol.models.v0.AirbyteGlobalState;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteStateMessage;
Expand Down Expand Up @@ -93,20 +104,7 @@ public class PostgresSource extends AbstractJdbcSource<PostgresType> implements
public static final String SSL_KEY = "sslkey";
public static final String SSL_PASSWORD = "sslpassword";
public static final String MODE = "mode";
public static final String NULL_CURSOR_VALUE_WITH_SCHEMA =
"""
SELECT
(EXISTS (SELECT FROM information_schema.columns WHERE table_schema = '%s' AND table_name = '%s' AND is_nullable = 'YES' AND column_name = '%s'))
AND
(EXISTS (SELECT from \"%s\".\"%s\" where \"%s\" IS NULL LIMIT 1)) AS %s
""";
public static final String NULL_CURSOR_VALUE_NO_SCHEMA =
"""
SELECT
(EXISTS (SELECT FROM information_schema.columns WHERE table_name = '%s' AND is_nullable = 'YES' AND column_name = '%s'))
AND
(EXISTS (SELECT from \"%s\" where \"%s\" IS NULL LIMIT 1)) AS %s
""";

private List<String> schemas;
private final FeatureFlags featureFlags;
private static final Set<String> INVALID_CDC_SSL_MODES = ImmutableSet.of("allow", "prefer");
Expand Down Expand Up @@ -535,10 +533,10 @@ protected boolean verifyCursorColumnValues(final JdbcDatabase database, final St
final String resultColName = "nullValue";
// Query: Only if cursor column allows null values, query whether it contains one
if (StringUtils.isNotBlank(schema)) {
query = String.format(NULL_CURSOR_VALUE_WITH_SCHEMA,
query = String.format(NULL_CURSOR_VALUE_WITH_SCHEMA_QUERY,
schema, tableName, columnName, schema, tableName, columnName, resultColName);
} else {
query = String.format(NULL_CURSOR_VALUE_NO_SCHEMA,
query = String.format(NULL_CURSOR_VALUE_NO_SCHEMA_QUERY,
tableName, columnName, tableName, columnName, resultColName);
}
LOGGER.debug("null value query: {}", query);
Expand All @@ -550,4 +548,123 @@ protected boolean verifyCursorColumnValues(final JdbcDatabase database, final St
return !nullValExist;
}

@Override
protected void estimateFullRefreshSyncSize(final JdbcDatabase database,
final ConfiguredAirbyteStream configuredAirbyteStream) {
try {
final String schemaName = configuredAirbyteStream.getStream().getNamespace();
final String tableName = configuredAirbyteStream.getStream().getName();
final String fullTableName =
getFullyQualifiedTableNameWithQuoting(schemaName, tableName, getQuoteString());

final List<JsonNode> tableEstimateResult = getFullTableEstimate(database, fullTableName);

if (!tableEstimateResult.isEmpty() && tableEstimateResult.get(0).has(ROW_COUNT_RESULT_COL) &&
tableEstimateResult.get(0).has(TOTAL_BYTES_RESULT_COL)) {
final long syncRowCount = tableEstimateResult.get(0).get(ROW_COUNT_RESULT_COL).asLong();
final long syncByteCount = tableEstimateResult.get(0).get(TOTAL_BYTES_RESULT_COL).asLong();

// Here, we double the bytes estimate to account for serialization. Perhaps a better way to do this
// is to
// read a row and Stringify it to better understand the accurate volume of data sent over the wire.
// However, this approach doesn't account for different row sizes.
AirbyteTraceMessageUtility.emitEstimateTrace(PLATFORM_DATA_INCREASE_FACTOR * syncByteCount, Type.STREAM, syncRowCount, tableName, schemaName);
LOGGER.info(String.format("Estimate for table: %s : {sync_row_count: %s, sync_bytes: %s, total_table_row_count: %s, total_table_bytes: %s}",
fullTableName, syncRowCount, syncByteCount, syncRowCount, syncByteCount));
}
} catch (final SQLException e) {
LOGGER.warn("Error occurred while attempting to estimate sync size", e);
}
}

@Override
protected void estimateIncrementalSyncSize(final JdbcDatabase database,
final ConfiguredAirbyteStream configuredAirbyteStream,
final CursorInfo cursorInfo,
final PostgresType cursorFieldType) {
try {
final String schemaName = configuredAirbyteStream.getStream().getNamespace();
final String tableName = configuredAirbyteStream.getStream().getName();
final String fullTableName =
getFullyQualifiedTableNameWithQuoting(schemaName, tableName, getQuoteString());

final List<JsonNode> tableEstimateResult = getFullTableEstimate(database, fullTableName);

final long tableRowCount = tableEstimateResult.get(0).get(ROW_COUNT_RESULT_COL).asLong();
final long tableByteCount = tableEstimateResult.get(0).get(TOTAL_BYTES_RESULT_COL).asLong();
akashkulk marked this conversation as resolved.
Show resolved Hide resolved

final long syncRowCount;
final long syncByteCount;

syncRowCount = getIncrementalTableRowCount(database, fullTableName, cursorInfo, cursorFieldType);
if (tableRowCount == 0) {
syncByteCount = 0;
} else {
syncByteCount = (tableByteCount / tableRowCount) * syncRowCount;
}

// Here, we double the bytes estimate to account for serialization. Perhaps a better way to do this
// is to
// read a row and Stringify it to better understand the accurate volume of data sent over the wire.
// However, this approach doesn't account for different row sizes
AirbyteTraceMessageUtility.emitEstimateTrace(PLATFORM_DATA_INCREASE_FACTOR * syncByteCount, Type.STREAM, syncRowCount, tableName, schemaName);
LOGGER.info(String.format("Estimate for table: %s : {sync_row_count: %s, sync_bytes: %s, total_table_row_count: %s, total_table_bytes: %s}",
fullTableName, syncRowCount, syncByteCount, tableRowCount, tableRowCount));
} catch (final SQLException e) {
LOGGER.warn("Error occurred while attempting to estimate sync size", e);
}
}

private List<JsonNode> getFullTableEstimate(final JdbcDatabase database, final String fullTableName) throws SQLException {
// Construct the table estimate query.
final String tableEstimateQuery =
String.format(TABLE_ESTIMATE_QUERY, fullTableName, ROW_COUNT_RESULT_COL, fullTableName, TOTAL_BYTES_RESULT_COL);
LOGGER.debug("table estimate query: {}", tableEstimateQuery);
final List<JsonNode> jsonNodes = database.bufferedResultSetQuery(conn -> conn.createStatement().executeQuery(tableEstimateQuery),
resultSet -> JdbcUtils.getDefaultSourceOperations().rowToJson(resultSet));
Preconditions.checkState(jsonNodes.size() == 1);
return jsonNodes;
}

private long getIncrementalTableRowCount(final JdbcDatabase database,
final String fullTableName,
final CursorInfo cursorInfo,
final PostgresType cursorFieldType)
throws SQLException {
final String quotedCursorField = getIdentifierWithQuoting(cursorInfo.getCursorField(), getQuoteString());

// Calculate actual number of rows to sync here.
final List<JsonNode> result = database.queryJsons(
connection -> {
LOGGER.info("Preparing query for table: {}", fullTableName);
final String operator;
if (cursorInfo.getCursorRecordCount() <= 0L) {
operator = ">";
} else {
final long actualRecordCount = getActualCursorRecordCount(
connection, fullTableName, quotedCursorField, cursorFieldType, cursorInfo.getCursor());
LOGGER.info("Table {} cursor count: expected {}, actual {}", fullTableName, cursorInfo.getCursorRecordCount(), actualRecordCount);
if (actualRecordCount == cursorInfo.getCursorRecordCount()) {
operator = ">";
} else {
operator = ">=";
}
}

final StringBuilder sql = new StringBuilder(String.format("SELECT COUNT(*) FROM %s WHERE %s %s ?",
fullTableName,
quotedCursorField,
operator));

final PreparedStatement preparedStatement = connection.prepareStatement(sql.toString());
LOGGER.info("Executing query for table {}: {}", fullTableName, preparedStatement);
sourceOperations.setCursorField(preparedStatement, 1, cursorFieldType, cursorInfo.getCursor());
return preparedStatement;
},
resultSet -> JdbcUtils.getDefaultSourceOperations().rowToJson(resultSet));

Preconditions.checkState(result.size() == 1);
return result.get(0).get("count").asLong();
}

}
Loading