From ce8f6db0c42d73c1d0c5b7888b4cb5b9a3c88d00 Mon Sep 17 00:00:00 2001 From: lshrinivas Date: Wed, 9 Feb 2022 18:29:45 -0500 Subject: [PATCH 1/2] fix cockroachdb connector replication failure due to multiple open portals error --- .../source/cockroachdb/CockroachDbSource.java | 23 ++++- .../cockroachdb/CockroachJdbcDatabase.java | 95 +++++++++++++++++++ 2 files changed, 115 insertions(+), 3 deletions(-) create mode 100644 airbyte-integrations/connectors/source-cockroachdb/src/main/java/io/airbyte/integrations/source/cockroachdb/CockroachJdbcDatabase.java diff --git a/airbyte-integrations/connectors/source-cockroachdb/src/main/java/io/airbyte/integrations/source/cockroachdb/CockroachDbSource.java b/airbyte-integrations/connectors/source-cockroachdb/src/main/java/io/airbyte/integrations/source/cockroachdb/CockroachDbSource.java index 6221d8eddb2f..7a200b0e3d46 100644 --- a/airbyte-integrations/connectors/source-cockroachdb/src/main/java/io/airbyte/integrations/source/cockroachdb/CockroachDbSource.java +++ b/airbyte-integrations/connectors/source-cockroachdb/src/main/java/io/airbyte/integrations/source/cockroachdb/CockroachDbSource.java @@ -9,8 +9,8 @@ import io.airbyte.commons.functional.CheckedFunction; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.util.AutoCloseableIterator; +import io.airbyte.db.Databases; import io.airbyte.db.jdbc.JdbcDatabase; -import io.airbyte.db.jdbc.PostgresJdbcStreamingQueryConfiguration; import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.integrations.base.Source; import io.airbyte.integrations.base.ssh.SshWrappedSource; @@ -39,7 +39,7 @@ public class CockroachDbSource extends AbstractJdbcSource { public static final List PORT_KEY = List.of("port"); public CockroachDbSource() { - super(DRIVER_CLASS, new PostgresJdbcStreamingQueryConfiguration(), new CockroachJdbcSourceOperations()); + super(DRIVER_CLASS, null, new CockroachJdbcSourceOperations()); } public static Source sshWrappedSource() { @@ -110,12 +110,29 @@ protected boolean isNotInternalSchema(JsonNode jsonNode, Set internalSch return false; } + @Override + public JdbcDatabase createDatabase(final JsonNode config) throws SQLException { + final JsonNode jdbcConfig = toDatabaseConfig(config); + + final JdbcDatabase database = Databases.createJdbcDatabase( + jdbcConfig.get("username").asText(), + jdbcConfig.has("password") ? jdbcConfig.get("password").asText() : null, + jdbcConfig.get("jdbc_url").asText(), + driverClass, + jdbcConfig.has("connection_properties") ? jdbcConfig.get("connection_properties").asText() : null, + sourceOperations); + + quoteString = (quoteString == null ? database.getMetaData().getIdentifierQuoteString() : quoteString); + + return new CockroachJdbcDatabase(database, sourceOperations); + } + private CheckedFunction getPrivileges(JdbcDatabase database) { return connection -> { final PreparedStatement ps = connection.prepareStatement( "SELECT DISTINCT table_catalog, table_schema, table_name, privilege_type\n" + "FROM information_schema.table_privileges\n" - + "WHERE grantee = ? AND privilege_type in ('SELECT', 'ALL')"); + + "WHERE (grantee = ? AND privilege_type in ('SELECT', 'ALL')) OR (table_schema = 'public')"); ps.setString(1, database.getDatabaseConfig().get("username").asText()); return ps; }; diff --git a/airbyte-integrations/connectors/source-cockroachdb/src/main/java/io/airbyte/integrations/source/cockroachdb/CockroachJdbcDatabase.java b/airbyte-integrations/connectors/source-cockroachdb/src/main/java/io/airbyte/integrations/source/cockroachdb/CockroachJdbcDatabase.java new file mode 100644 index 000000000000..9df77e4e0af5 --- /dev/null +++ b/airbyte-integrations/connectors/source-cockroachdb/src/main/java/io/airbyte/integrations/source/cockroachdb/CockroachJdbcDatabase.java @@ -0,0 +1,95 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.cockroachdb; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.commons.functional.CheckedConsumer; +import io.airbyte.commons.functional.CheckedFunction; +import io.airbyte.db.JdbcCompatibleSourceOperations; +import io.airbyte.db.jdbc.JdbcDatabase; +import io.airbyte.db.jdbc.JdbcStreamingQueryConfiguration; + +import javax.sql.DataSource; + +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.List; +import java.util.stream.Stream; + +/** + * This implementation uses non-streamed queries to CockroachDB. CockroachDB + * does not currently support multiple active pgwire portals on the same session, + * which makes it impossible to replicate tables that have over ~1000 rows + * using StreamingJdbcDatabase. See: https://go.crdb.dev/issue-v/40195/v21.2 + * and in particular, the comment: + * https://github.com/cockroachdb/cockroach/issues/40195?version=v21.2#issuecomment-870570351 + * The same situation as kafka-connect applies to StreamingJdbcDatabase + */ +public class CockroachJdbcDatabase + extends JdbcDatabase +{ + + private final JdbcDatabase database; + + public CockroachJdbcDatabase(final JdbcDatabase database, + final JdbcCompatibleSourceOperations sourceOperations) { + super(sourceOperations); + this.database = database; + } + + @Override + public DatabaseMetaData getMetaData() throws SQLException { + return database.getMetaData(); + } + + @Override + public void execute(final CheckedConsumer query) throws SQLException { + database.execute(query); + } + + @Override + public List bufferedResultSetQuery(final CheckedFunction query, + final CheckedFunction recordTransform) + throws SQLException { + return database.bufferedResultSetQuery(query, recordTransform); + } + + @Override + public Stream resultSetQuery(final CheckedFunction query, + final CheckedFunction recordTransform) + throws SQLException { + return database.resultSetQuery(query, recordTransform); + } + + @Override + public Stream query(final CheckedFunction statementCreator, + final CheckedFunction recordTransform) + throws SQLException { + return database.query(statementCreator, recordTransform); + } + + @Override + public Stream query(final String sql, final String... params) throws SQLException { + return bufferedResultSetQuery(connection -> { + final PreparedStatement statement = connection.prepareStatement(sql); + int i = 1; + for (final String param : params) { + statement.setString(i, param); + ++i; + } + return statement.executeQuery(); + }, sourceOperations::rowToJson).stream(); + + } + + @Override + public void close() throws Exception { + database.close(); + } + +} From 7bc045bb2b29faf990f2bc5843f9e982b05165cd Mon Sep 17 00:00:00 2001 From: marcosmarxm Date: Thu, 24 Feb 2022 23:16:31 -0300 Subject: [PATCH 2/2] bump connector version --- .../init/src/main/resources/seed/source_definitions.yaml | 2 +- airbyte-config/init/src/main/resources/seed/source_specs.yaml | 2 +- .../connectors/source-cockroachdb-strict-encrypt/Dockerfile | 2 +- airbyte-integrations/connectors/source-cockroachdb/Dockerfile | 2 +- docs/integrations/sources/cockroachdb.md | 1 + 5 files changed, 5 insertions(+), 4 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 71f5bf3067f0..abe908d8930e 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -140,7 +140,7 @@ - name: Cockroachdb sourceDefinitionId: 9fa5862c-da7c-11eb-8d19-0242ac130003 dockerRepository: airbyte/source-cockroachdb - dockerImageTag: 0.1.9 + dockerImageTag: 0.1.10 documentationUrl: https://docs.airbyte.io/integrations/sources/cockroachdb icon: cockroachdb.svg sourceType: database diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index ef47d6600745..3dba49e10132 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -1205,7 +1205,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-cockroachdb:0.1.9" +- dockerImage: "airbyte/source-cockroachdb:0.1.10" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/cockroachdb" connectionSpecification: diff --git a/airbyte-integrations/connectors/source-cockroachdb-strict-encrypt/Dockerfile b/airbyte-integrations/connectors/source-cockroachdb-strict-encrypt/Dockerfile index 72d6e2647861..aebdad56b6b7 100644 --- a/airbyte-integrations/connectors/source-cockroachdb-strict-encrypt/Dockerfile +++ b/airbyte-integrations/connectors/source-cockroachdb-strict-encrypt/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION source-cockroachdb-strict-encrypt COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.1.6 +LABEL io.airbyte.version=0.1.7 LABEL io.airbyte.name=airbyte/source-cockroachdb-strict-encrypt diff --git a/airbyte-integrations/connectors/source-cockroachdb/Dockerfile b/airbyte-integrations/connectors/source-cockroachdb/Dockerfile index d174645aefc0..d0a0ca9cff1e 100644 --- a/airbyte-integrations/connectors/source-cockroachdb/Dockerfile +++ b/airbyte-integrations/connectors/source-cockroachdb/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION source-cockroachdb COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.1.9 +LABEL io.airbyte.version=0.1.10 LABEL io.airbyte.name=airbyte/source-cockroachdb diff --git a/docs/integrations/sources/cockroachdb.md b/docs/integrations/sources/cockroachdb.md index cabddbc46671..cae8045ecc96 100644 --- a/docs/integrations/sources/cockroachdb.md +++ b/docs/integrations/sources/cockroachdb.md @@ -95,6 +95,7 @@ Your database user should now be ready for use with Airbyte. | Version | Date | Pull Request | Subject | | :--- | :--- | :--- | :--- | +| 0.1.10 | 2022-02-24 | [10235](https://github.com/airbytehq/airbyte/pull/10235) | Fix Replication Failure due Multiple portal opens | | 0.1.9 | 2022-02-21 | [10242](https://github.com/airbytehq/airbyte/pull/10242) | Fixed cursor for old connectors that use non-microsecond format. Now connectors work with both formats | | 0.1.8 | 2022-02-18 | [10242](https://github.com/airbytehq/airbyte/pull/10242) | Updated timestamp transformation with microseconds | | 0.1.7 | 2022-02-14 | [10256](https://github.com/airbytehq/airbyte/pull/10256) | Add `-XX:+ExitOnOutOfMemoryError` JVM option |