diff --git a/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java b/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java index fa390578dcb4..ea58d1cb0108 100644 --- a/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java +++ b/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java @@ -299,7 +299,7 @@ protected DataSource createDataSource(final JsonNode config) { jdbcConfig.has(JdbcUtils.PASSWORD_KEY) ? jdbcConfig.get(JdbcUtils.PASSWORD_KEY).asText() : null, driverClass, jdbcConfig.get(JdbcUtils.JDBC_URL_KEY).asText(), - JdbcUtils.parseJdbcParameters(jdbcConfig, JdbcUtils.CONNECTION_PROPERTIES_KEY, getJdbcParameterDelimiter())); + getConnectionProperties(config)); // Record the data source so that it can be closed. dataSources.add(dataSource); return dataSource; @@ -339,7 +339,7 @@ protected Map getConnectionProperties(final JsonNode config) { * @param defaultParameters connection properties map as specified by each Jdbc source * @throws IllegalArgumentException */ - private void assertCustomParametersDontOverwriteDefaultParameters(final Map customParameters, + protected static void assertCustomParametersDontOverwriteDefaultParameters(final Map customParameters, final Map defaultParameters) { for (final String key : defaultParameters.keySet()) { if (customParameters.containsKey(key) && !Objects.equals(customParameters.get(key), defaultParameters.get(key))) { diff --git a/airbyte-integrations/connectors/source-jdbc/src/test/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-jdbc/src/test/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSourceAcceptanceTest.java index 5f690f08db38..1e5eea3bbcaf 100644 --- a/airbyte-integrations/connectors/source-jdbc/src/test/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-jdbc/src/test/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSourceAcceptanceTest.java @@ -4,6 +4,9 @@ package io.airbyte.integrations.source.jdbc; +import static io.airbyte.integrations.source.jdbc.AbstractJdbcSource.assertCustomParametersDontOverwriteDefaultParameters; +import static org.junit.jupiter.api.Assertions.assertThrows; + import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; import io.airbyte.commons.features.EnvVariableFeatureFlags; @@ -22,12 +25,16 @@ import io.airbyte.protocol.models.AirbyteStateMessage.AirbyteStateType; import io.airbyte.protocol.models.AirbyteStreamState; import io.airbyte.test.utils.PostgreSQLContainerHelper; +import io.airbyte.integrations.util.HostPortResolver; import java.sql.JDBCType; import java.util.List; +import java.util.Map; +import java.util.Objects; import java.util.Set; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.containers.PostgreSQLContainer; @@ -43,6 +50,7 @@ class AbstractJdbcSourceAcceptanceTest extends JdbcSourceAcceptanceTest { private static PostgreSQLContainer PSQL_DB; private JsonNode config; + private String dbName; @BeforeAll static void init() { @@ -53,7 +61,7 @@ static void init() { @BeforeEach public void setup() throws Exception { - final String dbName = Strings.addRandomSuffix("db", "_", 10).toLowerCase(); + dbName = Strings.addRandomSuffix("db", "_", 10).toLowerCase(); config = Jsons.jsonNode(ImmutableMap.builder() .put(JdbcUtils.HOST_KEY, PSQL_DB.getHost()) @@ -85,6 +93,18 @@ public JsonNode getConfig() { return config; } + public JsonNode getConfigWithConnectionProperties(final PostgreSQLContainer psqlDb, final String dbName, final String additionalParameters) { + return Jsons.jsonNode(ImmutableMap.builder() + .put(JdbcUtils.HOST_KEY, HostPortResolver.resolveHost(psqlDb)) + .put(JdbcUtils.PORT_KEY, HostPortResolver.resolvePort(psqlDb)) + .put(JdbcUtils.DATABASE_KEY, dbName) + .put(JdbcUtils.SCHEMAS_KEY, List.of(SCHEMA_NAME)) + .put(JdbcUtils.USERNAME_KEY, psqlDb.getUsername()) + .put(JdbcUtils.PASSWORD_KEY, psqlDb.getPassword()) + .put(JdbcUtils.CONNECTION_PROPERTIES_KEY, additionalParameters) + .build()); + } + @Override public String getDriverClass() { return PostgresTestSource.DRIVER_CLASS; @@ -161,4 +181,18 @@ public static void main(final String[] args) throws Exception { } + @Test + void testCustomParametersOverwriteDefaultParametersExpectException() { + final String connectionPropertiesUrl = "ssl=false"; + final JsonNode config = getConfigWithConnectionProperties(PSQL_DB, dbName, connectionPropertiesUrl); + final Map customParameters = JdbcUtils.parseJdbcParameters(config, JdbcUtils.CONNECTION_PROPERTIES_KEY, "&"); + final Map defaultParameters = Map.of( + "ssl", "true", + "sslmode", "require" + ); + assertThrows(IllegalArgumentException.class, () -> { + assertCustomParametersDontOverwriteDefaultParameters(customParameters, defaultParameters); + }); + } + } diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java index d0ebab199c30..a8da51ae7d4d 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java @@ -72,15 +72,6 @@ public class PostgresSource extends AbstractJdbcSource implements Source { private static final Logger LOGGER = LoggerFactory.getLogger(PostgresSource.class); - - public static final String CDC_LSN = "_ab_cdc_lsn"; - public static final String DATABASE_KEY = "database"; - public static final String HOST_KEY = "host"; - public static final String JDBC_URL_KEY = "jdbc_url"; - public static final String PASSWORD_KEY = "password"; - public static final String PORT_KEY = "port"; - public static final String SCHEMAS_KEY = "schemas"; - public static final String USERNAME_KEY = "username"; static final String DRIVER_CLASS = DatabaseDriver.POSTGRESQL.getDriverClassName(); static final Map SSL_JDBC_PARAMETERS = ImmutableMap.of( "ssl", "true",