Skip to content

Commit

Permalink
Refactor to enable support for optional JDBC parameters for all JDBC …
Browse files Browse the repository at this point in the history
…destinations (#10421)

* refactoring to allow testing

* MySQLDestination uses connection property map instead of url arguments

* Update jdbc destinations

* A little more generic

* reset to master

* reset to master

* move to jdbcutils

* Align when multiline

* Align when multiline

* Update postgres to use property map

* Move tests to AbstractJdbcDestinationTest

* clean

* Align when multiline

* return property map

* Add postgres tests

* update clickhouse

* reformat

* reset

* reformat

* fix test

* reformat

* fix bug

* Add mssql tests

* refactor test

* fix oracle destination test

* oracle tests

* fix redshift acceptance test

* Pass string

* Revert "Pass string"

This reverts commit 6978217.

* Double deserialization

* Revert "Double deserialization"

This reverts commit ee8d752.

* try updating json_operations

* Revert "try updating json_operations"

This reverts commit c8022c2.

* json parse

* Revert "json parse"

This reverts commit 11a6725.

* Revert "Revert "Double deserialization""

This reverts commit 213f47a.

* Revert "Revert "Revert "Double deserialization"""

This reverts commit 6682245.

* move to constant

* Add comment

* map can be constant

* Add comment

* move map

* hide in method

* no need to create new map

* no need to create new map

* no need to create new map

* enably mysql test

* Update changelogs

* Update changelog

* update changelog

* Bump versions

* bump version

* disable dbt support

* update spec

* update other oracle tests

* update doc

* bump seed

* fix source test

* update seed spec file

* fix expected spec
  • Loading branch information
girarda authored and etsybaev committed Mar 5, 2022
1 parent d2c9556 commit f439340
Show file tree
Hide file tree
Showing 51 changed files with 966 additions and 517 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
- name: Clickhouse
destinationDefinitionId: ce0d828e-1dc4-496c-b122-2da42e637e48
dockerRepository: airbyte/destination-clickhouse
dockerImageTag: 0.1.3
dockerImageTag: 0.1.4
documentationUrl: https://docs.airbyte.io/integrations/destinations/clickhouse
- name: DynamoDB
destinationDefinitionId: 8ccd8909-4e99-4141-b48d-4984b70b2d89
Expand Down Expand Up @@ -108,7 +108,7 @@
- name: MS SQL Server
destinationDefinitionId: d4353156-9217-4cad-8dd7-c108fd4f74cf
dockerRepository: airbyte/destination-mssql
dockerImageTag: 0.1.14
dockerImageTag: 0.1.15
documentationUrl: https://docs.airbyte.io/integrations/destinations/mssql
icon: mssql.svg
- name: MeiliSearch
Expand All @@ -126,19 +126,19 @@
- name: MySQL
destinationDefinitionId: ca81ee7c-3163-4246-af40-094cc31e5e42
dockerRepository: airbyte/destination-mysql
dockerImageTag: 0.1.17
dockerImageTag: 0.1.18
documentationUrl: https://docs.airbyte.io/integrations/destinations/mysql
icon: mysql.svg
- name: Oracle
destinationDefinitionId: 3986776d-2319-4de9-8af8-db14c0996e72
dockerRepository: airbyte/destination-oracle
dockerImageTag: 0.1.13
dockerImageTag: 0.1.15
documentationUrl: https://docs.airbyte.io/integrations/destinations/oracle
icon: oracle.svg
- name: Postgres
destinationDefinitionId: 25c5221d-dce2-4163-ade9-739ef790f503
dockerRepository: airbyte/destination-postgres
dockerImageTag: 0.3.14
dockerImageTag: 0.3.15
documentationUrl: https://docs.airbyte.io/integrations/destinations/postgres
icon: postgresql.svg
- name: Pulsar
Expand All @@ -162,7 +162,7 @@
- name: Redshift
destinationDefinitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
dockerRepository: airbyte/destination-redshift
dockerImageTag: 0.3.26
dockerImageTag: 0.3.27
documentationUrl: https://docs.airbyte.io/integrations/destinations/redshift
icon: redshift.svg
- name: Rockset
Expand All @@ -185,7 +185,7 @@
- name: Snowflake
destinationDefinitionId: 424892c4-daac-4491-b35d-c6688ba547ba
dockerRepository: airbyte/destination-snowflake
dockerImageTag: 0.4.16
dockerImageTag: 0.4.17
documentationUrl: https://docs.airbyte.io/integrations/destinations/snowflake
icon: snowflake.svg
resourceRequirements:
Expand All @@ -207,7 +207,7 @@
- name: MariaDB ColumnStore
destinationDefinitionId: 294a4790-429b-40ae-9516-49826b9702e1
dockerRepository: airbyte/destination-mariadb-columnstore
dockerImageTag: 0.1.3
dockerImageTag: 0.1.4
documentationUrl: https://docs.airbyte.io/integrations/destinations/mariadb-columnstore
icon: mariadb.svg
- name: Streamr
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,7 @@
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-clickhouse:0.1.3"
- dockerImage: "airbyte/destination-clickhouse:0.1.4"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/clickhouse"
connectionSpecification:
Expand Down Expand Up @@ -2077,7 +2077,7 @@
supportsDBT: false
supported_destination_sync_modes:
- "append"
- dockerImage: "airbyte/destination-mssql:0.1.14"
- dockerImage: "airbyte/destination-mssql:0.1.15"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/mssql"
connectionSpecification:
Expand Down Expand Up @@ -2464,7 +2464,7 @@
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-mysql:0.1.17"
- dockerImage: "airbyte/destination-mysql:0.1.18"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/mysql"
connectionSpecification:
Expand Down Expand Up @@ -2629,7 +2629,7 @@
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-oracle:0.1.13"
- dockerImage: "airbyte/destination-oracle:0.1.15"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/oracle"
connectionSpecification:
Expand Down Expand Up @@ -2853,11 +2853,11 @@
order: 4
supportsIncremental: true
supportsNormalization: false
supportsDBT: true
supportsDBT: false
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-postgres:0.3.14"
- dockerImage: "airbyte/destination-postgres:0.3.15"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/postgres"
connectionSpecification:
Expand Down Expand Up @@ -3272,7 +3272,7 @@
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-redshift:0.3.26"
- dockerImage: "airbyte/destination-redshift:0.3.27"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/redshift"
connectionSpecification:
Expand Down Expand Up @@ -3825,7 +3825,7 @@
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-snowflake:0.4.16"
- dockerImage: "airbyte/destination-snowflake:0.4.17"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/snowflake"
connectionSpecification:
Expand Down Expand Up @@ -4078,7 +4078,7 @@
- "overwrite"
- "append"
- "append_dedup"
- dockerImage: "airbyte/destination-mariadb-columnstore:0.1.3"
- dockerImage: "airbyte/destination-mariadb-columnstore:0.1.4"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/mariadb-columnstore"
connectionSpecification:
Expand Down
40 changes: 10 additions & 30 deletions airbyte-db/lib/src/main/java/io/airbyte/db/Databases.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.db;

import com.google.common.collect.Maps;
import io.airbyte.commons.lang.Exceptions;
import io.airbyte.db.bigquery.BigQueryDatabase;
import io.airbyte.db.jdbc.DefaultJdbcDatabase;
Expand All @@ -15,7 +16,6 @@
import io.airbyte.db.mongodb.MongoDatabase;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import lombok.val;
import org.apache.commons.dbcp2.BasicDataSource;
Expand All @@ -41,7 +41,7 @@ public static Database createPostgresDatabaseWithRetry(final String username,
try {
val infinity = Integer.MAX_VALUE;
database = createPostgresDatabaseWithRetryTimeout(username, password, jdbcConnectionString, isDbReady, infinity);
} catch (IOException e) {
} catch (final IOException e) {
// This should theoretically never happen since we set the timeout to be a very high number.
}
}
Expand Down Expand Up @@ -131,9 +131,9 @@ public static Database createDatabase(final String username,
final String jdbcConnectionString,
final String driverClassName,
final SQLDialect dialect,
final String connectionProperties) {
final Map<String, String> connectionProperties) {
final BasicDataSource connectionPool =
createBasicDataSource(username, password, jdbcConnectionString, driverClassName, Optional.ofNullable(connectionProperties));
createBasicDataSource(username, password, jdbcConnectionString, driverClassName, connectionProperties);

return new Database(connectionPool, dialect);
}
Expand All @@ -159,7 +159,7 @@ public static JdbcDatabase createJdbcDatabase(final String username,
final String password,
final String jdbcConnectionString,
final String driverClassName,
final String connectionProperties) {
final Map<String, String> connectionProperties) {
return createJdbcDatabase(username, password, jdbcConnectionString, driverClassName, connectionProperties,
JdbcUtils.getDefaultSourceOperations());
}
Expand All @@ -168,10 +168,10 @@ public static JdbcDatabase createJdbcDatabase(final String username,
final String password,
final String jdbcConnectionString,
final String driverClassName,
final String connectionProperties,
final Map<String, String> connectionProperties,
final JdbcCompatibleSourceOperations<?> sourceOperations) {
final BasicDataSource connectionPool =
createBasicDataSource(username, password, jdbcConnectionString, driverClassName, Optional.ofNullable(connectionProperties));
createBasicDataSource(username, password, jdbcConnectionString, driverClassName, connectionProperties);

return new DefaultJdbcDatabase(connectionPool, sourceOperations);
}
Expand All @@ -181,10 +181,10 @@ public static JdbcDatabase createStreamingJdbcDatabase(final String username,
final String jdbcConnectionString,
final String driverClassName,
final JdbcStreamingQueryConfiguration jdbcStreamingQuery,
final String connectionProperties,
final Map<String, String> connectionProperties,
final JdbcCompatibleSourceOperations<?> sourceOperations) {
final BasicDataSource connectionPool =
createBasicDataSource(username, password, jdbcConnectionString, driverClassName, Optional.ofNullable(connectionProperties));
createBasicDataSource(username, password, jdbcConnectionString, driverClassName, connectionProperties);

return new StreamingJdbcDatabase(connectionPool, sourceOperations, jdbcStreamingQuery);
}
Expand All @@ -194,27 +194,7 @@ private static BasicDataSource createBasicDataSource(final String username,
final String jdbcConnectionString,
final String driverClassName) {
return createBasicDataSource(username, password, jdbcConnectionString, driverClassName,
Optional.empty());
}

/**
* Prefer to use the method that takes in the connection properties as a map.
*/
@Deprecated
private static BasicDataSource createBasicDataSource(final String username,
final String password,
final String jdbcConnectionString,
final String driverClassName,
final Optional<String> connectionProperties) {
final BasicDataSource connectionPool = new BasicDataSource();
connectionPool.setDriverClassName(driverClassName);
connectionPool.setUsername(username);
connectionPool.setPassword(password);
connectionPool.setInitialSize(0);
connectionPool.setMaxTotal(5);
connectionPool.setUrl(jdbcConnectionString);
connectionProperties.ifPresent(connectionPool::setConnectionProperties);
return connectionPool;
Maps.newHashMap());
}

public static BasicDataSource createBasicDataSource(final String username,
Expand Down
30 changes: 30 additions & 0 deletions airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/JdbcUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@

package io.airbyte.db.jdbc;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.Maps;
import java.util.HashMap;
import java.util.Map;
import org.jooq.JSONFormat;

public class JdbcUtils {
Expand All @@ -24,4 +28,30 @@ public static String getFullyQualifiedTableName(final String schemaName, final S
return schemaName != null ? schemaName + "." + tableName : tableName;
}

public static Map<String, String> parseJdbcParameters(final JsonNode config, final String jdbcUrlParamsKey) {
if (config.has(jdbcUrlParamsKey)) {
return parseJdbcParameters(config.get(jdbcUrlParamsKey).asText());
} else {
return Maps.newHashMap();
}
}

public static Map<String, String> parseJdbcParameters(final String jdbcPropertiesString) {
final Map<String, String> parameters = new HashMap<>();
if (!jdbcPropertiesString.isBlank()) {
final String[] keyValuePairs = jdbcPropertiesString.split("&");
for (final String kv : keyValuePairs) {
final String[] split = kv.split("=");
if (split.length == 2) {
parameters.put(split[0], split[1]);
} else {
throw new IllegalArgumentException(
"jdbc_url_params must be formatted as 'key=value' pairs separated by the symbol '&'. (example: key1=value1&key2=value2&key3=value3). Got "
+ jdbcPropertiesString);
}
}
}
return parameters;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-clickhouse-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.1.1
LABEL io.airbyte.version=0.1.4
LABEL io.airbyte.name=airbyte/destination-clickhouse-strict-encrypt
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,21 @@
"supportsIncremental": true,
"supportsNormalization": true,
"supportsDBT": false,
"supported_destination_sync_modes": ["overwrite", "append", "append_dedup"],
"supported_destination_sync_modes": [
"overwrite",
"append",
"append_dedup"
],
"connectionSpecification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "ClickHouse Destination Spec",
"type": "object",
"required": ["host", "port", "database", "username"],
"required": [
"host",
"port",
"database",
"username"
],
"additionalProperties": true,
"properties": {
"host": {
Expand All @@ -24,27 +33,41 @@
"minimum": 0,
"maximum": 65536,
"default": 8123,
"examples": ["8123"],
"examples": [
"8123"
],
"order": 1
},
"tcp-port": {
"title": "Native Port",
"description": "Native port (not the JDBC) of the database.",
"type": "integer",
"minimum": 0,
"maximum": 65536,
"default": 9000,
"examples": [
"9000"
],
"order": 2
},
"database": {
"title": "DB Name",
"description": "Name of the database.",
"type": "string",
"order": 2
"order": 3
},
"username": {
"title": "User",
"description": "Username to use to access the database.",
"type": "string",
"order": 3
"order": 4
},
"password": {
"title": "Password",
"description": "Password associated with the username.",
"type": "string",
"airbyte_secret": true,
"order": 4
"order": 5
},
"tunnel_method": {
"type": "object",
Expand All @@ -53,7 +76,9 @@
"oneOf": [
{
"title": "No Tunnel",
"required": ["tunnel_method"],
"required": [
"tunnel_method"
],
"properties": {
"tunnel_method": {
"description": "No ssh tunnel needed to connect to database",
Expand Down Expand Up @@ -92,7 +117,9 @@
"minimum": 0,
"maximum": 65536,
"default": 22,
"examples": ["22"],
"examples": [
"22"
],
"order": 2
},
"tunnel_user": {
Expand Down Expand Up @@ -140,7 +167,9 @@
"minimum": 0,
"maximum": 65536,
"default": 22,
"examples": ["22"],
"examples": [
"22"
],
"order": 2
},
"tunnel_user": {
Expand Down
Loading

0 comments on commit f439340

Please sign in to comment.