From 75cea88f47395baf207c339fec3ef0e3915fa629 Mon Sep 17 00:00:00 2001 From: Liren Tu Date: Wed, 3 Aug 2022 15:32:10 -0700 Subject: [PATCH 1/8] Fix first record wait time parsing bug --- .../source/postgres/PostgresUtils.java | 15 +++++++++------ .../source/postgres/PostgresUtilsTest.java | 6 ++++-- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresUtils.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresUtils.java index a866b29c521c..db1981fe4d01 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresUtils.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresUtils.java @@ -31,18 +31,21 @@ public static boolean isCdc(final JsonNode config) { } public static Duration getFirstRecordWaitTime(final JsonNode config) { - if (config.has("initial_waiting_seconds")) { - final int seconds = config.get("initial_waiting_seconds").asInt(); + final JsonNode replicationMethod = config.get("replication_method"); + Duration firstRecordWaitTime = DEFAULT_FIRST_RECORD_WAIT_TIME; + if (replicationMethod != null && replicationMethod.has("initial_waiting_seconds")) { + final int seconds = config.get("replication_method").get("initial_waiting_seconds").asInt(); if (seconds > MAX_FIRST_RECORD_WAIT_TIME.getSeconds()) { LOGGER.warn("First record waiting time is overridden to {} minutes, which is the max time allowed for safety.", MAX_FIRST_RECORD_WAIT_TIME.toMinutes()); - return MAX_FIRST_RECORD_WAIT_TIME; + firstRecordWaitTime = MAX_FIRST_RECORD_WAIT_TIME; + } else { + firstRecordWaitTime = Duration.ofSeconds(seconds); } - - return Duration.ofSeconds(seconds); } - return DEFAULT_FIRST_RECORD_WAIT_TIME; + LOGGER.info("First record waiting time: {} seconds", firstRecordWaitTime.getSeconds()); + return firstRecordWaitTime; } } diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresUtilsTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresUtilsTest.java index 7ced8930d3b2..d4144499d8fb 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresUtilsTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresUtilsTest.java @@ -36,10 +36,12 @@ void testGetFirstRecordWaitTime() { final JsonNode emptyConfig = Jsons.jsonNode(Collections.emptyMap()); assertEquals(PostgresUtils.DEFAULT_FIRST_RECORD_WAIT_TIME, PostgresUtils.getFirstRecordWaitTime(emptyConfig)); - final JsonNode normalConfig = Jsons.jsonNode(Map.of("initial_waiting_seconds", 500)); + final JsonNode normalConfig = Jsons.jsonNode(Map.of("replication_method", + Map.of("method", "CDC", "initial_waiting_seconds", 500))); assertEquals(Duration.ofSeconds(500), PostgresUtils.getFirstRecordWaitTime(normalConfig)); - final JsonNode tooLongConfig = Jsons.jsonNode(Map.of("initial_waiting_seconds", MAX_FIRST_RECORD_WAIT_TIME.getSeconds() + 100)); + final JsonNode tooLongConfig = Jsons.jsonNode(Map.of("replication_method", + Map.of("method", "CDC", "initial_waiting_seconds", MAX_FIRST_RECORD_WAIT_TIME.getSeconds() + 100))); assertEquals(MAX_FIRST_RECORD_WAIT_TIME, PostgresUtils.getFirstRecordWaitTime(tooLongConfig)); } From a71d43f58a0267e6614b95b38668cc0099c13bf8 Mon Sep 17 00:00:00 2001 From: Liren Tu Date: Wed, 3 Aug 2022 15:40:16 -0700 Subject: [PATCH 2/8] Bump version --- .../connectors/source-postgres-strict-encrypt/Dockerfile | 2 +- airbyte-integrations/connectors/source-postgres/Dockerfile | 2 +- docs/integrations/sources/postgres.md | 3 ++- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile b/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile index 5994668f109f..6fec5f54a9c3 100644 --- a/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile +++ b/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres-strict-encrypt COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.4.41 +LABEL io.airbyte.version=0.4.42 LABEL io.airbyte.name=airbyte/source-postgres-strict-encrypt diff --git a/airbyte-integrations/connectors/source-postgres/Dockerfile b/airbyte-integrations/connectors/source-postgres/Dockerfile index 538075a6572a..db21839f69fe 100644 --- a/airbyte-integrations/connectors/source-postgres/Dockerfile +++ b/airbyte-integrations/connectors/source-postgres/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.4.41 +LABEL io.airbyte.version=0.4.42 LABEL io.airbyte.name=airbyte/source-postgres diff --git a/docs/integrations/sources/postgres.md b/docs/integrations/sources/postgres.md index ef319d076573..87b9320bbc66 100644 --- a/docs/integrations/sources/postgres.md +++ b/docs/integrations/sources/postgres.md @@ -354,6 +354,7 @@ Possible solutions include: | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------| +| 0.4.42 | 2022-08-03 | [15273](https://github.com/airbytehq/airbyte/pull/15273) | Fix a bug in `0.4.36` and correctly parse the CDC initial record waiting time | | 0.4.41 | 2022-08-03 | [15077](https://github.com/airbytehq/airbyte/pull/15077) | Sync data from beginning if the LSN is no longer valid in CDC | | | 2022-08-03 | [14903](https://github.com/airbytehq/airbyte/pull/14903) | Emit state messages more frequently | | 0.4.40 | 2022-08-03 | [15187](https://github.com/airbytehq/airbyte/pull/15187) | Add support for BCE dates/timestamps | @@ -361,7 +362,7 @@ Possible solutions include: | 0.4.39 | 2022-08-02 | [14801](https://github.com/airbytehq/airbyte/pull/14801) | Fix multiply log bindings | | 0.4.38 | 2022-07-26 | [14362](https://github.com/airbytehq/airbyte/pull/14362) | Integral columns are now discovered as int64 fields. | | 0.4.37 | 2022-07-22 | [14714](https://github.com/airbytehq/airbyte/pull/14714) | Clarified error message when invalid cursor column selected | -| 0.4.36 | 2022-07-21 | [14451](https://github.com/airbytehq/airbyte/pull/14451) | Make initial CDC waiting time configurable | +| 0.4.36 | 2022-07-21 | [14451](https://github.com/airbytehq/airbyte/pull/14451) | Make initial CDC waiting time configurable (⛔ this version has a bug and will not work; use `0.4.42` instead) | | 0.4.35 | 2022-07-14 | [14574](https://github.com/airbytehq/airbyte/pull/14574) | Removed additionalProperties:false from JDBC source connectors | | 0.4.34 | 2022-07-17 | [13840](https://github.com/airbytehq/airbyte/pull/13840) | Added the ability to connect using different SSL modes and SSL certificates. | | 0.4.33 | 2022-07-14 | [14586](https://github.com/airbytehq/airbyte/pull/14586) | Validate source JDBC url parameters | From 0bcbd3581db775c78380a93cc6b4f140f8499801 Mon Sep 17 00:00:00 2001 From: Liren Tu Date: Wed, 3 Aug 2022 16:20:13 -0700 Subject: [PATCH 3/8] Add connection check for first record waiting time --- .../source/postgres/PostgresSource.java | 4 +++ .../source/postgres/PostgresUtils.java | 27 ++++++++++++++++--- .../source/postgres/PostgresUtilsTest.java | 12 ++++++++- 3 files changed, 38 insertions(+), 5 deletions(-) diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java index 143f1694a0dc..ed18a3cfed45 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java @@ -262,6 +262,10 @@ public List> getCheckOperations(final J } }); + + checkOperations.add(database -> { + PostgresUtils.checkFirstRecordWaitTime(config); + }); } return checkOperations; diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresUtils.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresUtils.java index db1981fe4d01..180c33d4f72a 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresUtils.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresUtils.java @@ -6,6 +6,7 @@ import com.fasterxml.jackson.databind.JsonNode; import java.time.Duration; +import java.util.Optional; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,17 +31,35 @@ public static boolean isCdc(final JsonNode config) { return isCdc; } - public static Duration getFirstRecordWaitTime(final JsonNode config) { + public static Optional getFirstRecordWaitSeconds(final JsonNode config) { final JsonNode replicationMethod = config.get("replication_method"); - Duration firstRecordWaitTime = DEFAULT_FIRST_RECORD_WAIT_TIME; if (replicationMethod != null && replicationMethod.has("initial_waiting_seconds")) { final int seconds = config.get("replication_method").get("initial_waiting_seconds").asInt(); - if (seconds > MAX_FIRST_RECORD_WAIT_TIME.getSeconds()) { + return Optional.of(seconds); + } + return Optional.empty(); + } + + public static void checkFirstRecordWaitTime(final JsonNode config) { + final Optional firstRecordWaitSeconds = getFirstRecordWaitSeconds(config); + if (firstRecordWaitSeconds.isPresent() && firstRecordWaitSeconds.get() > MAX_FIRST_RECORD_WAIT_TIME.getSeconds()) { + throw new IllegalArgumentException(String.format( + "Initial waiting seconds cannot be larger than %d seconds for safety.", + MAX_FIRST_RECORD_WAIT_TIME.getSeconds())); + } + } + + public static Duration getFirstRecordWaitTime(final JsonNode config) { + Duration firstRecordWaitTime = DEFAULT_FIRST_RECORD_WAIT_TIME; + + final Optional firstRecordWaitSeconds = getFirstRecordWaitSeconds(config); + if (firstRecordWaitSeconds.isPresent()) { + if (firstRecordWaitSeconds.get() > MAX_FIRST_RECORD_WAIT_TIME.getSeconds()) { LOGGER.warn("First record waiting time is overridden to {} minutes, which is the max time allowed for safety.", MAX_FIRST_RECORD_WAIT_TIME.toMinutes()); firstRecordWaitTime = MAX_FIRST_RECORD_WAIT_TIME; } else { - firstRecordWaitTime = Duration.ofSeconds(seconds); + firstRecordWaitTime = Duration.ofSeconds(firstRecordWaitSeconds.get()); } } diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresUtilsTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresUtilsTest.java index d4144499d8fb..fa5e8f6d64b5 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresUtilsTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresUtilsTest.java @@ -5,8 +5,10 @@ package io.airbyte.integrations.source.postgres; import static io.airbyte.integrations.source.postgres.PostgresUtils.MAX_FIRST_RECORD_WAIT_TIME; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import com.fasterxml.jackson.databind.JsonNode; @@ -16,6 +18,7 @@ import java.time.Duration; import java.util.Collections; import java.util.Map; +import java.util.Optional; import org.junit.jupiter.api.Test; class PostgresUtilsTest { @@ -34,14 +37,21 @@ void testIsCdc() { @Test void testGetFirstRecordWaitTime() { final JsonNode emptyConfig = Jsons.jsonNode(Collections.emptyMap()); + assertDoesNotThrow(() -> PostgresUtils.checkFirstRecordWaitTime(emptyConfig)); + assertEquals(Optional.empty(), PostgresUtils.getFirstRecordWaitSeconds(emptyConfig)); assertEquals(PostgresUtils.DEFAULT_FIRST_RECORD_WAIT_TIME, PostgresUtils.getFirstRecordWaitTime(emptyConfig)); final JsonNode normalConfig = Jsons.jsonNode(Map.of("replication_method", Map.of("method", "CDC", "initial_waiting_seconds", 500))); + assertDoesNotThrow(() -> PostgresUtils.checkFirstRecordWaitTime(normalConfig)); + assertEquals(Optional.of(500), PostgresUtils.getFirstRecordWaitSeconds(normalConfig)); assertEquals(Duration.ofSeconds(500), PostgresUtils.getFirstRecordWaitTime(normalConfig)); + final int tooLongTimout = (int) MAX_FIRST_RECORD_WAIT_TIME.getSeconds() + 100; final JsonNode tooLongConfig = Jsons.jsonNode(Map.of("replication_method", - Map.of("method", "CDC", "initial_waiting_seconds", MAX_FIRST_RECORD_WAIT_TIME.getSeconds() + 100))); + Map.of("method", "CDC", "initial_waiting_seconds", tooLongTimout))); + assertThrows(IllegalArgumentException.class, () -> PostgresUtils.checkFirstRecordWaitTime(tooLongConfig)); + assertEquals(Optional.of(tooLongTimout), PostgresUtils.getFirstRecordWaitSeconds(tooLongConfig)); assertEquals(MAX_FIRST_RECORD_WAIT_TIME, PostgresUtils.getFirstRecordWaitTime(tooLongConfig)); } From 9243122e215a492846e737e355bd0c8403a934e4 Mon Sep 17 00:00:00 2001 From: Liren Tu Date: Wed, 3 Aug 2022 17:52:07 -0700 Subject: [PATCH 4/8] Update spec --- .../connectors/source-postgres/src/main/resources/spec.json | 4 ++-- .../source-postgres/src/test/resources/expected_spec.json | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/airbyte-integrations/connectors/source-postgres/src/main/resources/spec.json b/airbyte-integrations/connectors/source-postgres/src/main/resources/spec.json index 88af03e13bfd..e1761821f404 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/source-postgres/src/main/resources/spec.json @@ -270,10 +270,10 @@ "initial_waiting_seconds": { "type": "integer", "title": "Initial Waiting Time in Seconds (Advanced)", - "description": "The amount of time the connector will wait when it launches to figure out whether there is new data to sync or not. Default to 5 minutes. For more information read about initial waiting time.", + "description": "The amount of time the connector will wait when it launches to figure out whether there is new data to sync or not. Default to 300 seconds. Valid range: 120 seconds to 1200 seconds. For more information read about initial waiting time.", "default": 300, "order": 4, - "min": 30, + "min": 120, "max": 1200 } } diff --git a/airbyte-integrations/connectors/source-postgres/src/test/resources/expected_spec.json b/airbyte-integrations/connectors/source-postgres/src/test/resources/expected_spec.json index e7d8b478e0fb..cb8c04865a86 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/resources/expected_spec.json +++ b/airbyte-integrations/connectors/source-postgres/src/test/resources/expected_spec.json @@ -248,10 +248,10 @@ "initial_waiting_seconds": { "type": "integer", "title": "Initial Waiting Time in Seconds (Advanced)", - "description": "The amount of time the connector will wait when it launches to figure out whether there is new data to sync or not. Default to 5 minutes. For more information read about initial waiting time.", + "description": "The amount of time the connector will wait when it launches to figure out whether there is new data to sync or not. Default to 300 seconds. Valid range: 120 seconds to 1200 seconds. For more information read about initial waiting time.", "default": 300, "order": 4, - "min": 30, + "min": 120, "max": 1200 } } From 357bf1daa0c2172b5c2c7a426f3cf0399c7b5a8d Mon Sep 17 00:00:00 2001 From: Liren Tu Date: Wed, 3 Aug 2022 19:38:38 -0700 Subject: [PATCH 5/8] Override initial waiting time when it is too short --- .../source/postgres/PostgresUtils.java | 29 ++++++++++++++----- .../source/postgres/PostgresUtilsTest.java | 14 +++++++-- 2 files changed, 33 insertions(+), 10 deletions(-) diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresUtils.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresUtils.java index 180c33d4f72a..db2117dd8021 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresUtils.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresUtils.java @@ -16,6 +16,7 @@ public class PostgresUtils { private static final String PGOUTPUT_PLUGIN = "pgoutput"; + public static final Duration MIN_FIRST_RECORD_WAIT_TIME = Duration.ofMinutes(2); public static final Duration MAX_FIRST_RECORD_WAIT_TIME = Duration.ofMinutes(20); public static final Duration DEFAULT_FIRST_RECORD_WAIT_TIME = Duration.ofMinutes(5); @@ -41,25 +42,39 @@ public static Optional getFirstRecordWaitSeconds(final JsonNode config) } public static void checkFirstRecordWaitTime(final JsonNode config) { + // we need to skip the check because in tests, we set initial_waiting_seconds + // to 5 seconds for performance reasons, which is shorter than the minimum + // value allowed in production + if (config.has("is_test") && config.get("is_test").asBoolean()) { + return; + } + final Optional firstRecordWaitSeconds = getFirstRecordWaitSeconds(config); - if (firstRecordWaitSeconds.isPresent() && firstRecordWaitSeconds.get() > MAX_FIRST_RECORD_WAIT_TIME.getSeconds()) { - throw new IllegalArgumentException(String.format( - "Initial waiting seconds cannot be larger than %d seconds for safety.", - MAX_FIRST_RECORD_WAIT_TIME.getSeconds())); + if (firstRecordWaitSeconds.isPresent()) { + final int seconds = firstRecordWaitSeconds.get(); + if (seconds < MIN_FIRST_RECORD_WAIT_TIME.getSeconds() || seconds > MAX_FIRST_RECORD_WAIT_TIME.getSeconds()) { + throw new IllegalArgumentException( + String.format("initial_waiting_seconds must be between %d and %d seconds", + MIN_FIRST_RECORD_WAIT_TIME.getSeconds(), MAX_FIRST_RECORD_WAIT_TIME.getSeconds())); + } } } public static Duration getFirstRecordWaitTime(final JsonNode config) { + final boolean isTest = config.has("is_test") && config.get("is_test").asBoolean(); Duration firstRecordWaitTime = DEFAULT_FIRST_RECORD_WAIT_TIME; final Optional firstRecordWaitSeconds = getFirstRecordWaitSeconds(config); if (firstRecordWaitSeconds.isPresent()) { - if (firstRecordWaitSeconds.get() > MAX_FIRST_RECORD_WAIT_TIME.getSeconds()) { + firstRecordWaitTime = Duration.ofSeconds(firstRecordWaitSeconds.get()); + if (!isTest && firstRecordWaitTime.compareTo(MIN_FIRST_RECORD_WAIT_TIME) < 0) { + LOGGER.warn("First record waiting time is overridden to {} minutes, which is the min time allowed for safety.", + MIN_FIRST_RECORD_WAIT_TIME.toMinutes()); + firstRecordWaitTime = MIN_FIRST_RECORD_WAIT_TIME; + } else if (!isTest && firstRecordWaitTime.compareTo(MAX_FIRST_RECORD_WAIT_TIME) > 0) { LOGGER.warn("First record waiting time is overridden to {} minutes, which is the max time allowed for safety.", MAX_FIRST_RECORD_WAIT_TIME.toMinutes()); firstRecordWaitTime = MAX_FIRST_RECORD_WAIT_TIME; - } else { - firstRecordWaitTime = Duration.ofSeconds(firstRecordWaitSeconds.get()); } } diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresUtilsTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresUtilsTest.java index fa5e8f6d64b5..a3d3c4deccf0 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresUtilsTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresUtilsTest.java @@ -5,6 +5,7 @@ package io.airbyte.integrations.source.postgres; import static io.airbyte.integrations.source.postgres.PostgresUtils.MAX_FIRST_RECORD_WAIT_TIME; +import static io.airbyte.integrations.source.postgres.PostgresUtils.MIN_FIRST_RECORD_WAIT_TIME; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -47,11 +48,18 @@ void testGetFirstRecordWaitTime() { assertEquals(Optional.of(500), PostgresUtils.getFirstRecordWaitSeconds(normalConfig)); assertEquals(Duration.ofSeconds(500), PostgresUtils.getFirstRecordWaitTime(normalConfig)); - final int tooLongTimout = (int) MAX_FIRST_RECORD_WAIT_TIME.getSeconds() + 100; + final int tooShortTimeout = (int) MIN_FIRST_RECORD_WAIT_TIME.getSeconds() - 1; + final JsonNode tooShortConfig = Jsons.jsonNode(Map.of("replication_method", + Map.of("method", "CDC", "initial_waiting_seconds", tooShortTimeout))); + assertThrows(IllegalArgumentException.class, () -> PostgresUtils.checkFirstRecordWaitTime(tooShortConfig)); + assertEquals(Optional.of(tooShortTimeout), PostgresUtils.getFirstRecordWaitSeconds(tooShortConfig)); + assertEquals(MIN_FIRST_RECORD_WAIT_TIME, PostgresUtils.getFirstRecordWaitTime(tooShortConfig)); + + final int tooLongTimeout = (int) MAX_FIRST_RECORD_WAIT_TIME.getSeconds() + 1; final JsonNode tooLongConfig = Jsons.jsonNode(Map.of("replication_method", - Map.of("method", "CDC", "initial_waiting_seconds", tooLongTimout))); + Map.of("method", "CDC", "initial_waiting_seconds", tooLongTimeout))); assertThrows(IllegalArgumentException.class, () -> PostgresUtils.checkFirstRecordWaitTime(tooLongConfig)); - assertEquals(Optional.of(tooLongTimout), PostgresUtils.getFirstRecordWaitSeconds(tooLongConfig)); + assertEquals(Optional.of(tooLongTimeout), PostgresUtils.getFirstRecordWaitSeconds(tooLongConfig)); assertEquals(MAX_FIRST_RECORD_WAIT_TIME, PostgresUtils.getFirstRecordWaitTime(tooLongConfig)); } From 5f72d078621f9336909c8172facf1da5142687da Mon Sep 17 00:00:00 2001 From: Liren Tu Date: Wed, 3 Aug 2022 19:40:58 -0700 Subject: [PATCH 6/8] Set is_test to true for cdc integration tests --- .../sources/CdcPostgresSourceAcceptanceTest.java | 1 + .../integration_tests/sources/CdcPostgresSourceDatatypeTest.java | 1 + .../integrations/source/postgres/CdcPostgresSourceTest.java | 1 + .../source/postgres/PostgresCdcGetPublicizedTablesTest.java | 1 + 4 files changed, 4 insertions(+) diff --git a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcPostgresSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcPostgresSourceAcceptanceTest.java index ac7a1762777a..92a618655098 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcPostgresSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcPostgresSourceAcceptanceTest.java @@ -77,6 +77,7 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc .put(JdbcUtils.PASSWORD_KEY, container.getPassword()) .put("replication_method", replicationMethod) .put(JdbcUtils.SSL_KEY, false) + .put("is_test", true) .build()); try (final DSLContext dslContext = DSLContextFactory.create( diff --git a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcPostgresSourceDatatypeTest.java b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcPostgresSourceDatatypeTest.java index f6b980e42c21..1426e7c67de8 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcPostgresSourceDatatypeTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcPostgresSourceDatatypeTest.java @@ -45,6 +45,7 @@ protected Database setupDatabase() throws Exception { .put("replication_slot", SLOT_NAME_BASE) .put("publication", PUBLICATION) .put("initial_waiting_seconds", INITIAL_WAITING_SECONDS) + .put("is_test", true) .build()); config = Jsons.jsonNode(ImmutableMap.builder() .put(JdbcUtils.HOST_KEY, HostPortResolver.resolveHost(container)) diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java index 7d011aad1766..68f34b6df5a9 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java @@ -139,6 +139,7 @@ private JsonNode getConfig(final String dbName) { .put(JdbcUtils.USERNAME_KEY, container.getUsername()) .put(JdbcUtils.PASSWORD_KEY, container.getPassword()) .put(JdbcUtils.SSL_KEY, false) + .put("is_test", true) .put("replication_method", replicationMethod) .build()); } diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresCdcGetPublicizedTablesTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresCdcGetPublicizedTablesTest.java index c052c10e4abc..e3c44bca9e63 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresCdcGetPublicizedTablesTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresCdcGetPublicizedTablesTest.java @@ -91,6 +91,7 @@ private JsonNode getConfig(final PostgreSQLContainer psqlDb, final String dbN .put(JdbcUtils.USERNAME_KEY, psqlDb.getUsername()) .put(JdbcUtils.PASSWORD_KEY, psqlDb.getPassword()) .put(JdbcUtils.SSL_KEY, false) + .put("is_test", true) .build()); } From 683491b154f4172b019cf5f0e0f66cdb8327b754 Mon Sep 17 00:00:00 2001 From: Liren Tu Date: Wed, 3 Aug 2022 20:47:52 -0700 Subject: [PATCH 7/8] Fix integration test --- .../sources/CdcPostgresSourceDatatypeTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcPostgresSourceDatatypeTest.java b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcPostgresSourceDatatypeTest.java index 1426e7c67de8..c609f07cec77 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcPostgresSourceDatatypeTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcPostgresSourceDatatypeTest.java @@ -45,7 +45,6 @@ protected Database setupDatabase() throws Exception { .put("replication_slot", SLOT_NAME_BASE) .put("publication", PUBLICATION) .put("initial_waiting_seconds", INITIAL_WAITING_SECONDS) - .put("is_test", true) .build()); config = Jsons.jsonNode(ImmutableMap.builder() .put(JdbcUtils.HOST_KEY, HostPortResolver.resolveHost(container)) @@ -55,6 +54,7 @@ protected Database setupDatabase() throws Exception { .put(JdbcUtils.USERNAME_KEY, container.getUsername()) .put(JdbcUtils.PASSWORD_KEY, container.getPassword()) .put("replication_method", replicationMethod) + .put("is_test", true) .put(JdbcUtils.SSL_KEY, false) .build()); From 2688118d0f66c90c867e97bd3d6bc1fae1032e60 Mon Sep 17 00:00:00 2001 From: Octavia Squidington III Date: Thu, 4 Aug 2022 07:50:23 +0000 Subject: [PATCH 8/8] auto-bump connector version [ci skip] --- .../init/src/main/resources/seed/source_definitions.yaml | 2 +- .../init/src/main/resources/seed/source_specs.yaml | 7 ++++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 72a801632ed6..e909e8c179c6 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -762,7 +762,7 @@ - name: Postgres sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750 dockerRepository: airbyte/source-postgres - dockerImageTag: 0.4.41 + dockerImageTag: 0.4.42 documentationUrl: https://docs.airbyte.io/integrations/sources/postgres icon: postgresql.svg sourceType: database diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 95157f0b20c1..70479556b0f0 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -7140,7 +7140,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-postgres:0.4.41" +- dockerImage: "airbyte/source-postgres:0.4.42" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres" connectionSpecification: @@ -7423,11 +7423,12 @@ title: "Initial Waiting Time in Seconds (Advanced)" description: "The amount of time the connector will wait when it launches\ \ to figure out whether there is new data to sync or not. Default\ - \ to 5 minutes. For more information read about initial waiting time." default: 300 order: 4 - min: 30 + min: 120 max: 1200 tunnel_method: type: "object"