Skip to content

Commit

Permalink
Fix CockroachDbSource compilation error (#10731)
Browse files Browse the repository at this point in the history
* Fix CockroachDbSource compilation error

* fix test too
  • Loading branch information
girarda authored Feb 28, 2022
1 parent 23562ff commit 13719a2
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.db.Databases;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.base.ssh.SshWrappedSource;
Expand Down Expand Up @@ -106,7 +107,7 @@ public Set<JdbcPrivilegeDto> getPrivilegesTableForCurrentUser(final JdbcDatabase
}

@Override
protected boolean isNotInternalSchema(JsonNode jsonNode, Set<String> internalSchemas) {
protected boolean isNotInternalSchema(final JsonNode jsonNode, final Set<String> internalSchemas) {
return false;
}

Expand All @@ -119,15 +120,15 @@ public JdbcDatabase createDatabase(final JsonNode config) throws SQLException {
jdbcConfig.has("password") ? jdbcConfig.get("password").asText() : null,
jdbcConfig.get("jdbc_url").asText(),
driverClass,
jdbcConfig.has("connection_properties") ? jdbcConfig.get("connection_properties").asText() : null,
JdbcUtils.parseJdbcParameters(jdbcConfig, "connection_properties"),
sourceOperations);

quoteString = (quoteString == null ? database.getMetaData().getIdentifierQuoteString() : quoteString);

return new CockroachJdbcDatabase(database, sourceOperations);
}

private CheckedFunction<Connection, PreparedStatement, SQLException> getPrivileges(JdbcDatabase database) {
private CheckedFunction<Connection, PreparedStatement, SQLException> getPrivileges(final JdbcDatabase database) {
return connection -> {
final PreparedStatement ps = connection.prepareStatement(
"SELECT DISTINCT table_catalog, table_schema, table_name, privilege_type\n"
Expand All @@ -138,7 +139,7 @@ private CheckedFunction<Connection, PreparedStatement, SQLException> getPrivileg
};
}

private JdbcPrivilegeDto getPrivilegeDto(JsonNode jsonNode) {
private JdbcPrivilegeDto getPrivilegeDto(final JsonNode jsonNode) {
return JdbcPrivilegeDto.builder()
.schemaName(jsonNode.get("table_schema").asText())
.tableName(jsonNode.get("table_name").asText())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.airbyte.commons.string.Strings;
import io.airbyte.commons.util.MoreIterators;
import io.airbyte.db.Databases;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.source.jdbc.AbstractJdbcSource;
import io.airbyte.integrations.source.jdbc.test.JdbcSourceAcceptanceTest;
import io.airbyte.integrations.source.relationaldb.models.DbState;
Expand Down Expand Up @@ -85,7 +86,7 @@ public void setup() throws Exception {
jdbcConfig.has("password") ? jdbcConfig.get("password").asText() : null,
jdbcConfig.get("jdbc_url").asText(),
getDriverClass(),
jdbcConfig.has("connection_properties") ? jdbcConfig.get("connection_properties").asText() : null);
JdbcUtils.parseJdbcParameters(jdbcConfig, "connection_properties"));

database.execute(connection -> connection.createStatement().execute("CREATE DATABASE " + config.get("database") + ";"));
super.setup();
Expand Down Expand Up @@ -120,28 +121,28 @@ static void cleanUp() {
protected AirbyteCatalog getCatalog(final String defaultNamespace) {
return new AirbyteCatalog().withStreams(Lists.newArrayList(
CatalogHelpers.createAirbyteStream(
TABLE_NAME,
defaultNamespace,
Field.of(COL_ID, JsonSchemaType.NUMBER),
Field.of(COL_NAME, JsonSchemaType.STRING),
Field.of(COL_UPDATED_AT, JsonSchemaType.STRING))
TABLE_NAME,
defaultNamespace,
Field.of(COL_ID, JsonSchemaType.NUMBER),
Field.of(COL_NAME, JsonSchemaType.STRING),
Field.of(COL_UPDATED_AT, JsonSchemaType.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of(COL_ID))),
CatalogHelpers.createAirbyteStream(
TABLE_NAME_WITHOUT_PK,
defaultNamespace,
Field.of(COL_ID, JsonSchemaType.NUMBER),
Field.of(COL_NAME, JsonSchemaType.STRING),
Field.of(COL_UPDATED_AT, JsonSchemaType.STRING),
Field.of(COL_ROW_ID, JsonSchemaType.NUMBER))
TABLE_NAME_WITHOUT_PK,
defaultNamespace,
Field.of(COL_ID, JsonSchemaType.NUMBER),
Field.of(COL_NAME, JsonSchemaType.STRING),
Field.of(COL_UPDATED_AT, JsonSchemaType.STRING),
Field.of(COL_ROW_ID, JsonSchemaType.NUMBER))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of(COL_ROW_ID))),
CatalogHelpers.createAirbyteStream(
TABLE_NAME_COMPOSITE_PK,
defaultNamespace,
Field.of(COL_FIRST_NAME, JsonSchemaType.STRING),
Field.of(COL_LAST_NAME, JsonSchemaType.STRING),
Field.of(COL_UPDATED_AT, JsonSchemaType.STRING))
TABLE_NAME_COMPOSITE_PK,
defaultNamespace,
Field.of(COL_FIRST_NAME, JsonSchemaType.STRING),
Field.of(COL_LAST_NAME, JsonSchemaType.STRING),
Field.of(COL_UPDATED_AT, JsonSchemaType.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(
List.of(List.of(COL_FIRST_NAME), List.of(COL_LAST_NAME)))));
Expand Down

0 comments on commit 13719a2

Please sign in to comment.