Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Source CockroachDB: fix connector replication failure due to multiple open portals error #10235

Merged
merged 4 commits into from
Feb 25, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@
- name: Cockroachdb
sourceDefinitionId: 9fa5862c-da7c-11eb-8d19-0242ac130003
dockerRepository: airbyte/source-cockroachdb
dockerImageTag: 0.1.9
dockerImageTag: 0.1.10
documentationUrl: https://docs.airbyte.io/integrations/sources/cockroachdb
icon: cockroachdb.svg
sourceType: database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1205,7 +1205,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-cockroachdb:0.1.9"
- dockerImage: "airbyte/source-cockroachdb:0.1.10"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/cockroachdb"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-cockroachdb-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.1.6
LABEL io.airbyte.version=0.1.7
LABEL io.airbyte.name=airbyte/source-cockroachdb-strict-encrypt
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-cockroachdb

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.1.9
LABEL io.airbyte.version=0.1.10
LABEL io.airbyte.name=airbyte/source-cockroachdb
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
import io.airbyte.commons.functional.CheckedFunction;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.db.Databases;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.db.jdbc.PostgresJdbcStreamingQueryConfiguration;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.base.ssh.SshWrappedSource;
Expand Down Expand Up @@ -39,7 +39,7 @@ public class CockroachDbSource extends AbstractJdbcSource<JDBCType> {
public static final List<String> PORT_KEY = List.of("port");

public CockroachDbSource() {
super(DRIVER_CLASS, new PostgresJdbcStreamingQueryConfiguration(), new CockroachJdbcSourceOperations());
super(DRIVER_CLASS, null, new CockroachJdbcSourceOperations());
}

public static Source sshWrappedSource() {
Expand Down Expand Up @@ -110,12 +110,29 @@ protected boolean isNotInternalSchema(JsonNode jsonNode, Set<String> internalSch
return false;
}

@Override
public JdbcDatabase createDatabase(final JsonNode config) throws SQLException {
final JsonNode jdbcConfig = toDatabaseConfig(config);

final JdbcDatabase database = Databases.createJdbcDatabase(
jdbcConfig.get("username").asText(),
jdbcConfig.has("password") ? jdbcConfig.get("password").asText() : null,
jdbcConfig.get("jdbc_url").asText(),
driverClass,
jdbcConfig.has("connection_properties") ? jdbcConfig.get("connection_properties").asText() : null,
sourceOperations);

quoteString = (quoteString == null ? database.getMetaData().getIdentifierQuoteString() : quoteString);

return new CockroachJdbcDatabase(database, sourceOperations);
}

private CheckedFunction<Connection, PreparedStatement, SQLException> getPrivileges(JdbcDatabase database) {
return connection -> {
final PreparedStatement ps = connection.prepareStatement(
"SELECT DISTINCT table_catalog, table_schema, table_name, privilege_type\n"
+ "FROM information_schema.table_privileges\n"
+ "WHERE grantee = ? AND privilege_type in ('SELECT', 'ALL')");
+ "WHERE (grantee = ? AND privilege_type in ('SELECT', 'ALL')) OR (table_schema = 'public')");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is unrelated to the bug, but without this, tables in the public schema are not visible in the Airbyte UI (which feels odd, since tables in the public schema in CockroachDB are visible to all users regardless of explicit grants). Feel free to undo this if this isn't a change you'd like to incorporate in the connector

ps.setString(1, database.getDatabaseConfig().get("username").asText());
return ps;
};
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.cockroachdb;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.functional.CheckedConsumer;
import io.airbyte.commons.functional.CheckedFunction;
import io.airbyte.db.JdbcCompatibleSourceOperations;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.db.jdbc.JdbcStreamingQueryConfiguration;

import javax.sql.DataSource;

import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import java.util.stream.Stream;

/**
* This implementation uses non-streamed queries to CockroachDB. CockroachDB
* does not currently support multiple active pgwire portals on the same session,
* which makes it impossible to replicate tables that have over ~1000 rows
* using StreamingJdbcDatabase. See: https://go.crdb.dev/issue-v/40195/v21.2
* and in particular, the comment:
* https://github.com/cockroachdb/cockroach/issues/40195?version=v21.2#issuecomment-870570351
* The same situation as kafka-connect applies to StreamingJdbcDatabase
*/
public class CockroachJdbcDatabase
extends JdbcDatabase
{

private final JdbcDatabase database;

public CockroachJdbcDatabase(final JdbcDatabase database,
final JdbcCompatibleSourceOperations<?> sourceOperations) {
super(sourceOperations);
this.database = database;
}

@Override
public DatabaseMetaData getMetaData() throws SQLException {
return database.getMetaData();
}

@Override
public void execute(final CheckedConsumer<Connection, SQLException> query) throws SQLException {
database.execute(query);
}

@Override
public <T> List<T> bufferedResultSetQuery(final CheckedFunction<Connection, ResultSet, SQLException> query,
final CheckedFunction<ResultSet, T, SQLException> recordTransform)
throws SQLException {
return database.bufferedResultSetQuery(query, recordTransform);
}

@Override
public <T> Stream<T> resultSetQuery(final CheckedFunction<Connection, ResultSet, SQLException> query,
final CheckedFunction<ResultSet, T, SQLException> recordTransform)
throws SQLException {
return database.resultSetQuery(query, recordTransform);
}

@Override
public <T> Stream<T> query(final CheckedFunction<Connection, PreparedStatement, SQLException> statementCreator,
final CheckedFunction<ResultSet, T, SQLException> recordTransform)
throws SQLException {
return database.query(statementCreator, recordTransform);
}

@Override
public Stream<JsonNode> query(final String sql, final String... params) throws SQLException {
return bufferedResultSetQuery(connection -> {
final PreparedStatement statement = connection.prepareStatement(sql);
int i = 1;
for (final String param : params) {
statement.setString(i, param);
++i;
}
return statement.executeQuery();
}, sourceOperations::rowToJson).stream();

}

@Override
public void close() throws Exception {
database.close();
}

}
1 change: 1 addition & 0 deletions docs/integrations/sources/cockroachdb.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ Your database user should now be ready for use with Airbyte.

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.1.10 | 2022-02-24 | [10235](https://github.com/airbytehq/airbyte/pull/10235) | Fix Replication Failure due Multiple portal opens |
| 0.1.9 | 2022-02-21 | [10242](https://github.com/airbytehq/airbyte/pull/10242) | Fixed cursor for old connectors that use non-microsecond format. Now connectors work with both formats |
| 0.1.8 | 2022-02-18 | [10242](https://github.com/airbytehq/airbyte/pull/10242) | Updated timestamp transformation with microseconds |
| 0.1.7 | 2022-02-14 | [10256](https://github.com/airbytehq/airbyte/pull/10256) | Add `-XX:+ExitOnOutOfMemoryError` JVM option |
Expand Down