Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐞 Postgres source: fix first record wait time parsing bug #15273

Merged
merged 9 commits into from
Aug 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,7 @@
- name: Postgres
sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerRepository: airbyte/source-postgres
dockerImageTag: 0.4.41
dockerImageTag: 0.4.42
documentationUrl: https://docs.airbyte.io/integrations/sources/postgres
icon: postgresql.svg
sourceType: database
Expand Down
7 changes: 4 additions & 3 deletions airbyte-config/init/src/main/resources/seed/source_specs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7140,7 +7140,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-postgres:0.4.41"
- dockerImage: "airbyte/source-postgres:0.4.42"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres"
connectionSpecification:
Expand Down Expand Up @@ -7423,11 +7423,12 @@
title: "Initial Waiting Time in Seconds (Advanced)"
description: "The amount of time the connector will wait when it launches\
\ to figure out whether there is new data to sync or not. Default\
\ to 5 minutes. For more information read about <a href=\"https://docs.airbyte.com/integrations/sources/postgres#initial-waiting-time\"\
\ to 300 seconds. Valid range: 120 seconds to 1200 seconds. For\
\ more information read about <a href=\"https://docs.airbyte.com/integrations/sources/postgres#initial-waiting-time\"\
>initial waiting time</a>."
default: 300
order: 4
min: 30
min: 120
max: 1200
tunnel_method:
type: "object"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.4.41
LABEL io.airbyte.version=0.4.42
LABEL io.airbyte.name=airbyte/source-postgres-strict-encrypt
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.4.41
LABEL io.airbyte.version=0.4.42
LABEL io.airbyte.name=airbyte/source-postgres
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,10 @@ public List<CheckedConsumer<JdbcDatabase, Exception>> getCheckOperations(final J
}

});

checkOperations.add(database -> {
PostgresUtils.checkFirstRecordWaitTime(config);
});
}

return checkOperations;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import java.time.Duration;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -15,6 +16,7 @@ public class PostgresUtils {

private static final String PGOUTPUT_PLUGIN = "pgoutput";

public static final Duration MIN_FIRST_RECORD_WAIT_TIME = Duration.ofMinutes(2);
public static final Duration MAX_FIRST_RECORD_WAIT_TIME = Duration.ofMinutes(20);
public static final Duration DEFAULT_FIRST_RECORD_WAIT_TIME = Duration.ofMinutes(5);

Expand All @@ -30,19 +32,54 @@ public static boolean isCdc(final JsonNode config) {
return isCdc;
}

public static Optional<Integer> getFirstRecordWaitSeconds(final JsonNode config) {
final JsonNode replicationMethod = config.get("replication_method");
if (replicationMethod != null && replicationMethod.has("initial_waiting_seconds")) {
final int seconds = config.get("replication_method").get("initial_waiting_seconds").asInt();
return Optional.of(seconds);
}
return Optional.empty();
}

public static void checkFirstRecordWaitTime(final JsonNode config) {
// we need to skip the check because in tests, we set initial_waiting_seconds
// to 5 seconds for performance reasons, which is shorter than the minimum
// value allowed in production
if (config.has("is_test") && config.get("is_test").asBoolean()) {
return;
}

final Optional<Integer> firstRecordWaitSeconds = getFirstRecordWaitSeconds(config);
if (firstRecordWaitSeconds.isPresent()) {
final int seconds = firstRecordWaitSeconds.get();
if (seconds < MIN_FIRST_RECORD_WAIT_TIME.getSeconds() || seconds > MAX_FIRST_RECORD_WAIT_TIME.getSeconds()) {
throw new IllegalArgumentException(
String.format("initial_waiting_seconds must be between %d and %d seconds",
MIN_FIRST_RECORD_WAIT_TIME.getSeconds(), MAX_FIRST_RECORD_WAIT_TIME.getSeconds()));
}
}
}

public static Duration getFirstRecordWaitTime(final JsonNode config) {
if (config.has("initial_waiting_seconds")) {
final int seconds = config.get("initial_waiting_seconds").asInt();
if (seconds > MAX_FIRST_RECORD_WAIT_TIME.getSeconds()) {
final boolean isTest = config.has("is_test") && config.get("is_test").asBoolean();
Duration firstRecordWaitTime = DEFAULT_FIRST_RECORD_WAIT_TIME;

final Optional<Integer> firstRecordWaitSeconds = getFirstRecordWaitSeconds(config);
if (firstRecordWaitSeconds.isPresent()) {
tuliren marked this conversation as resolved.
Show resolved Hide resolved
firstRecordWaitTime = Duration.ofSeconds(firstRecordWaitSeconds.get());
if (!isTest && firstRecordWaitTime.compareTo(MIN_FIRST_RECORD_WAIT_TIME) < 0) {
LOGGER.warn("First record waiting time is overridden to {} minutes, which is the min time allowed for safety.",
MIN_FIRST_RECORD_WAIT_TIME.toMinutes());
firstRecordWaitTime = MIN_FIRST_RECORD_WAIT_TIME;
} else if (!isTest && firstRecordWaitTime.compareTo(MAX_FIRST_RECORD_WAIT_TIME) > 0) {
LOGGER.warn("First record waiting time is overridden to {} minutes, which is the max time allowed for safety.",
tuliren marked this conversation as resolved.
Show resolved Hide resolved
MAX_FIRST_RECORD_WAIT_TIME.toMinutes());
return MAX_FIRST_RECORD_WAIT_TIME;
firstRecordWaitTime = MAX_FIRST_RECORD_WAIT_TIME;
}

return Duration.ofSeconds(seconds);
}

return DEFAULT_FIRST_RECORD_WAIT_TIME;
LOGGER.info("First record waiting time: {} seconds", firstRecordWaitTime.getSeconds());
return firstRecordWaitTime;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -270,10 +270,10 @@
"initial_waiting_seconds": {
"type": "integer",
"title": "Initial Waiting Time in Seconds (Advanced)",
"description": "The amount of time the connector will wait when it launches to figure out whether there is new data to sync or not. Default to 5 minutes. For more information read about <a href=\"https://docs.airbyte.com/integrations/sources/postgres#initial-waiting-time\">initial waiting time</a>.",
"description": "The amount of time the connector will wait when it launches to figure out whether there is new data to sync or not. Default to 300 seconds. Valid range: 120 seconds to 1200 seconds. For more information read about <a href=\"https://docs.airbyte.com/integrations/sources/postgres#initial-waiting-time\">initial waiting time</a>.",
"default": 300,
"order": 4,
"min": 30,
"min": 120,
"max": 1200
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc
.put(JdbcUtils.PASSWORD_KEY, container.getPassword())
.put("replication_method", replicationMethod)
.put(JdbcUtils.SSL_KEY, false)
.put("is_test", true)
.build());

try (final DSLContext dslContext = DSLContextFactory.create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ protected Database setupDatabase() throws Exception {
.put(JdbcUtils.USERNAME_KEY, container.getUsername())
.put(JdbcUtils.PASSWORD_KEY, container.getPassword())
.put("replication_method", replicationMethod)
.put("is_test", true)
.put(JdbcUtils.SSL_KEY, false)
.build());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ private JsonNode getConfig(final String dbName) {
.put(JdbcUtils.USERNAME_KEY, container.getUsername())
.put(JdbcUtils.PASSWORD_KEY, container.getPassword())
.put(JdbcUtils.SSL_KEY, false)
.put("is_test", true)
.put("replication_method", replicationMethod)
.build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ private JsonNode getConfig(final PostgreSQLContainer<?> psqlDb, final String dbN
.put(JdbcUtils.USERNAME_KEY, psqlDb.getUsername())
.put(JdbcUtils.PASSWORD_KEY, psqlDb.getPassword())
.put(JdbcUtils.SSL_KEY, false)
.put("is_test", true)
.build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
package io.airbyte.integrations.source.postgres;

import static io.airbyte.integrations.source.postgres.PostgresUtils.MAX_FIRST_RECORD_WAIT_TIME;
import static io.airbyte.integrations.source.postgres.PostgresUtils.MIN_FIRST_RECORD_WAIT_TIME;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.fasterxml.jackson.databind.JsonNode;
Expand All @@ -16,6 +19,7 @@
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import org.junit.jupiter.api.Test;

class PostgresUtilsTest {
Expand All @@ -34,12 +38,28 @@ void testIsCdc() {
@Test
void testGetFirstRecordWaitTime() {
final JsonNode emptyConfig = Jsons.jsonNode(Collections.emptyMap());
assertDoesNotThrow(() -> PostgresUtils.checkFirstRecordWaitTime(emptyConfig));
assertEquals(Optional.empty(), PostgresUtils.getFirstRecordWaitSeconds(emptyConfig));
assertEquals(PostgresUtils.DEFAULT_FIRST_RECORD_WAIT_TIME, PostgresUtils.getFirstRecordWaitTime(emptyConfig));

final JsonNode normalConfig = Jsons.jsonNode(Map.of("initial_waiting_seconds", 500));
final JsonNode normalConfig = Jsons.jsonNode(Map.of("replication_method",
Map.of("method", "CDC", "initial_waiting_seconds", 500)));
assertDoesNotThrow(() -> PostgresUtils.checkFirstRecordWaitTime(normalConfig));
assertEquals(Optional.of(500), PostgresUtils.getFirstRecordWaitSeconds(normalConfig));
assertEquals(Duration.ofSeconds(500), PostgresUtils.getFirstRecordWaitTime(normalConfig));

final JsonNode tooLongConfig = Jsons.jsonNode(Map.of("initial_waiting_seconds", MAX_FIRST_RECORD_WAIT_TIME.getSeconds() + 100));
final int tooShortTimeout = (int) MIN_FIRST_RECORD_WAIT_TIME.getSeconds() - 1;
final JsonNode tooShortConfig = Jsons.jsonNode(Map.of("replication_method",
Map.of("method", "CDC", "initial_waiting_seconds", tooShortTimeout)));
assertThrows(IllegalArgumentException.class, () -> PostgresUtils.checkFirstRecordWaitTime(tooShortConfig));
assertEquals(Optional.of(tooShortTimeout), PostgresUtils.getFirstRecordWaitSeconds(tooShortConfig));
assertEquals(MIN_FIRST_RECORD_WAIT_TIME, PostgresUtils.getFirstRecordWaitTime(tooShortConfig));

final int tooLongTimeout = (int) MAX_FIRST_RECORD_WAIT_TIME.getSeconds() + 1;
final JsonNode tooLongConfig = Jsons.jsonNode(Map.of("replication_method",
Map.of("method", "CDC", "initial_waiting_seconds", tooLongTimeout)));
assertThrows(IllegalArgumentException.class, () -> PostgresUtils.checkFirstRecordWaitTime(tooLongConfig));
assertEquals(Optional.of(tooLongTimeout), PostgresUtils.getFirstRecordWaitSeconds(tooLongConfig));
assertEquals(MAX_FIRST_RECORD_WAIT_TIME, PostgresUtils.getFirstRecordWaitTime(tooLongConfig));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,10 +248,10 @@
"initial_waiting_seconds": {
"type": "integer",
"title": "Initial Waiting Time in Seconds (Advanced)",
"description": "The amount of time the connector will wait when it launches to figure out whether there is new data to sync or not. Default to 5 minutes. For more information read about <a href=\"https://docs.airbyte.com/integrations/sources/postgres#initial-waiting-time\">initial waiting time</a>.",
"description": "The amount of time the connector will wait when it launches to figure out whether there is new data to sync or not. Default to 300 seconds. Valid range: 120 seconds to 1200 seconds. For more information read about <a href=\"https://docs.airbyte.com/integrations/sources/postgres#initial-waiting-time\">initial waiting time</a>.",
"default": 300,
"order": 4,
"min": 30,
"min": 120,
"max": 1200
}
}
Expand Down
3 changes: 2 additions & 1 deletion docs/integrations/sources/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -354,14 +354,15 @@ Possible solutions include:

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------|
| 0.4.42 | 2022-08-03 | [15273](https://github.com/airbytehq/airbyte/pull/15273) | Fix a bug in `0.4.36` and correctly parse the CDC initial record waiting time |
| 0.4.41 | 2022-08-03 | [15077](https://github.com/airbytehq/airbyte/pull/15077) | Sync data from beginning if the LSN is no longer valid in CDC |
| | 2022-08-03 | [14903](https://github.com/airbytehq/airbyte/pull/14903) | Emit state messages more frequently |
| 0.4.40 | 2022-08-03 | [15187](https://github.com/airbytehq/airbyte/pull/15187) | Add support for BCE dates/timestamps |
| | 2022-08-03 | [14534](https://github.com/airbytehq/airbyte/pull/14534) | Align regular and CDC integration tests and data mappers |
| 0.4.39 | 2022-08-02 | [14801](https://github.com/airbytehq/airbyte/pull/14801) | Fix multiply log bindings |
| 0.4.38 | 2022-07-26 | [14362](https://github.com/airbytehq/airbyte/pull/14362) | Integral columns are now discovered as int64 fields. |
| 0.4.37 | 2022-07-22 | [14714](https://github.com/airbytehq/airbyte/pull/14714) | Clarified error message when invalid cursor column selected |
| 0.4.36 | 2022-07-21 | [14451](https://github.com/airbytehq/airbyte/pull/14451) | Make initial CDC waiting time configurable |
| 0.4.36 | 2022-07-21 | [14451](https://github.com/airbytehq/airbyte/pull/14451) | Make initial CDC waiting time configurable (⛔ this version has a bug and will not work; use `0.4.42` instead) |
| 0.4.35 | 2022-07-14 | [14574](https://github.com/airbytehq/airbyte/pull/14574) | Removed additionalProperties:false from JDBC source connectors |
| 0.4.34 | 2022-07-17 | [13840](https://github.com/airbytehq/airbyte/pull/13840) | Added the ability to connect using different SSL modes and SSL certificates. |
| 0.4.33 | 2022-07-14 | [14586](https://github.com/airbytehq/airbyte/pull/14586) | Validate source JDBC url parameters |
Expand Down