Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source postgres: disable ssl modes allow and prefer in CDC mode #18256

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
- name: AlloyDB for PostgreSQL
sourceDefinitionId: 1fa90628-2b9e-11ed-a261-0242ac120002
dockerRepository: airbyte/source-alloydb
dockerImageTag: 1.0.15
dockerImageTag: 1.0.16
documentationUrl: https://docs.airbyte.com/integrations/sources/alloydb
icon: alloydb.svg
sourceType: database
Expand Down Expand Up @@ -864,7 +864,7 @@
- name: Postgres
sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerRepository: airbyte/source-postgres
dockerImageTag: 1.0.20
dockerImageTag: 1.0.21
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
icon: postgresql.svg
sourceType: database
Expand Down
4 changes: 2 additions & 2 deletions airbyte-config/init/src/main/resources/seed/source_specs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-alloydb:1.0.15"
- dockerImage: "airbyte/source-alloydb:1.0.16"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres"
connectionSpecification:
Expand Down Expand Up @@ -8792,7 +8792,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-postgres:1.0.20"
- dockerImage: "airbyte/source-postgres:1.0.21"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-alloydb-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.15
LABEL io.airbyte.version=1.0.16
LABEL io.airbyte.name=airbyte/source-alloydb-strict-encrypt
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-alloydb/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-alloydb

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.15
LABEL io.airbyte.version=1.0.16
LABEL io.airbyte.name=airbyte/source-alloydb
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.20
LABEL io.airbyte.version=1.0.21
LABEL io.airbyte.name=airbyte/source-postgres-strict-encrypt
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.20
LABEL io.airbyte.version=1.0.21
LABEL io.airbyte.name=airbyte/source-postgres
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.commons.functional.CheckedConsumer;
Expand Down Expand Up @@ -78,6 +79,7 @@ public class PostgresSource extends AbstractJdbcSource<JDBCType> implements Sour
private static final int INTERMEDIATE_STATE_EMISSION_FREQUENCY = 10_000;

public static final String PARAM_SSLMODE = "sslmode";
public static final String SSL_MODE = "ssl_mode";
public static final String PARAM_SSL = "ssl";
public static final String PARAM_SSL_TRUE = "true";
public static final String PARAM_SSL_FALSE = "false";
Expand All @@ -87,11 +89,13 @@ public class PostgresSource extends AbstractJdbcSource<JDBCType> implements Sour
public static final String CA_CERTIFICATE_PATH = "ca_certificate_path";
public static final String SSL_KEY = "sslkey";
public static final String SSL_PASSWORD = "sslpassword";
public static final String MODE = "mode";
static final Map<String, String> SSL_JDBC_PARAMETERS = ImmutableMap.of(
"ssl", "true",
"sslmode", "require");
private List<String> schemas;
private final FeatureFlags featureFlags;
private static final Set<String> INVALID_CDC_SSL_MODES = ImmutableSet.of("allow", "prefer");

public static Source sshWrappedSource() {
return new SshWrappedSource(new PostgresSource(), JdbcUtils.HOST_LIST_KEY, JdbcUtils.PORT_LIST_KEY);
Expand Down Expand Up @@ -464,6 +468,23 @@ public static void main(final String[] args) throws Exception {
LOGGER.info("completed source: {}", PostgresSource.class);
}

@Override
public AirbyteConnectionStatus check(final JsonNode config) throws Exception {
if (PostgresUtils.isCdc(config)) {
if (config.has(SSL_MODE) && config.get(SSL_MODE).has(MODE)){
String sslModeValue = config.get(SSL_MODE).get(MODE).asText();
if (INVALID_CDC_SSL_MODES.contains(sslModeValue)) {
return new AirbyteConnectionStatus()
.withStatus(Status.FAILED)
.withMessage(String.format(
"In CDC replication mode ssl value '%s' is invalid. Please use one of the following SSL modes: disable, require, verify-ca, verify-full",
sslModeValue));
}
}
}
return super.check(config);
}

@Override
protected String toSslJdbcParam(final SslMode sslMode) {
return toSslJdbcParamInternal(sslMode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.airbyte.db.factory.DatabaseDriver;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.CatalogHelpers;
Expand All @@ -35,6 +36,7 @@
import io.airbyte.test.utils.PostgreSQLContainerHelper;
import java.math.BigDecimal;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -197,4 +199,34 @@ void testIsCdc() {
assertTrue(PostgresUtils.isCdc(config));
}

@Test
void testAllowSSLWithCdcReplicationMethod() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice tests!


JsonNode config = getCDCAndSslModeConfig("allow");

final AirbyteConnectionStatus actual = new PostgresSource().check(config);
assertEquals(AirbyteConnectionStatus.Status.FAILED, actual.getStatus());
assertTrue(actual.getMessage().contains("In CDC replication mode ssl value 'allow' is invalid"));
}

@Test
void testPreferSSLWithCdcReplicationMethod() throws Exception {

JsonNode config = getCDCAndSslModeConfig("prefer");

final AirbyteConnectionStatus actual = new PostgresSource().check(config);
assertEquals(AirbyteConnectionStatus.Status.FAILED, actual.getStatus());
assertTrue(actual.getMessage().contains("In CDC replication mode ssl value 'prefer' is invalid"));
}

private JsonNode getCDCAndSslModeConfig(String sslMode) {
return Jsons.jsonNode(ImmutableMap.builder()
.put(JdbcUtils.SSL_KEY, true)
.put(JdbcUtils.SSL_MODE_KEY, Map.of(JdbcUtils.MODE_KEY, sslMode))
.put("replication_method", Map.of("method", "CDC",
"replication_slot", "slot",
"publication", "ab_pub"))
.build());
}

}
1 change: 1 addition & 0 deletions docs/integrations/sources/alloydb.md
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:-------------------------------------------------|
| 1.0.16 | 2022-10-25 | [18256](https://github.com/airbytehq/airbyte/pull/18256) | Disable allow and prefer ssl modes in CDC mode |
| | 2022-10-13 | [15535](https://github.com/airbytehq/airbyte/pull/16238) | Update incremental query to avoid data missing when new data is inserted at the same time as a sync starts under non-CDC incremental mode |
| 1.0.15 | 2022-10-11 | [17782](https://github.com/airbytehq/airbyte/pull/17782) | Align with Postgres source v.1.0.15 |
| 1.0.0 | 2022-09-15 | [16776](https://github.com/airbytehq/airbyte/pull/16776) | Align with strict-encrypt version |
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,7 @@ The root causes is that the WALs needed for the incremental sync has been remove

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-------------------------------------------------------------------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 1.0.21 | 2022-10-25 | [18256](https://github.com/airbytehq/airbyte/pull/18256) | Disable allow and prefer ssl modes in CDC mode |
| 1.0.20 | 2022-10-25 | [18383](https://github.com/airbytehq/airbyte/pull/18383) | Better SSH error handling + messages |
| 1.0.19 | 2022-10-21 | [18263](https://github.com/airbytehq/airbyte/pull/18263) | Fixes bug introduced in [15833](https://github.com/airbytehq/airbyte/pull/15833) and adds better error messaging for SSH tunnel in Destinations |
| 1.0.18 | 2022-10-19 | [18087](https://github.com/airbytehq/airbyte/pull/18087) | Better error messaging for configuration errors (SSH configs, choosing an invalid cursor) |
Expand Down