Skip to content

Commit

Permalink
🐛Destination-Snowflake: updated check method to handle more errors (#…
Browse files Browse the repository at this point in the history
…18970)

* [16833] Destination-Snowflake: updated check method to handle "No Active Warehouse" error or user has incorrect permissions
  • Loading branch information
etsybaev authored and akashkulk committed Nov 17, 2022
1 parent 8289adc commit b5c1955
Show file tree
Hide file tree
Showing 12 changed files with 123 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@
- name: Snowflake
destinationDefinitionId: 424892c4-daac-4491-b35d-c6688ba547ba
dockerRepository: airbyte/destination-snowflake
dockerImageTag: 0.4.38
dockerImageTag: 0.4.39
documentationUrl: https://docs.airbyte.com/integrations/destinations/snowflake
icon: snowflake.svg
normalizationRepository: airbyte/normalization-snowflake
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5408,7 +5408,7 @@
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-snowflake:0.4.38"
- dockerImage: "airbyte/destination-snowflake:0.4.39"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/destinations/snowflake"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.exceptions.ConnectionErrorException;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.map.MoreMaps;
import io.airbyte.db.factory.DataSourceFactory;
import io.airbyte.db.jdbc.DefaultJdbcDatabase;
Expand All @@ -21,8 +22,10 @@
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
Expand Down Expand Up @@ -84,11 +87,38 @@ public AirbyteConnectionStatus check(final JsonNode config) {
}
}

/**
* This method is deprecated. It verifies table creation, but not insert right to a newly created
* table. Use attemptTableOperations with the attemptInsert argument instead.
*/
@Deprecated
public static void attemptSQLCreateAndDropTableOperations(final String outputSchema,
final JdbcDatabase database,
final NamingConventionTransformer namingResolver,
final SqlOperations sqlOps)
throws Exception {
attemptTableOperations(outputSchema, database, namingResolver, sqlOps, false);
}

/**
* Verifies if provided creds has enough permissions. Steps are: 1. Create schema if not exists. 2.
* Create test table. 3. Insert dummy record to newly created table if "attemptInsert" set to true.
* 4. Delete table created on step 2.
*
* @param outputSchema - schema to tests against.
* @param database - database to tests against.
* @param namingResolver - naming resolver.
* @param sqlOps - SqlOperations object
* @param attemptInsert - set true if need to make attempt to insert dummy records to newly created
* table. Set false to skip insert step.
* @throws Exception
*/
public static void attemptTableOperations(final String outputSchema,
final JdbcDatabase database,
final NamingConventionTransformer namingResolver,
final SqlOperations sqlOps,
final boolean attemptInsert)
throws Exception {
// verify we have write permissions on the target schema by creating a table with a random name,
// then dropping that table
try {
Expand All @@ -100,7 +130,14 @@ public static void attemptSQLCreateAndDropTableOperations(final String outputSch
final String outputTableName = namingResolver.getIdentifier("_airbyte_connection_test_" + UUID.randomUUID().toString().replaceAll("-", ""));
sqlOps.createSchemaIfNotExists(database, outputSchema);
sqlOps.createTableIfNotExists(database, outputSchema, outputTableName);
sqlOps.dropTableIfExists(database, outputSchema, outputTableName);
// verify if user has permission to make SQL INSERT queries
try {
if (attemptInsert) {
sqlOps.insertRecords(database, List.of(getDummyRecord()), outputSchema, outputTableName);
}
} finally {
sqlOps.dropTableIfExists(database, outputSchema, outputTableName);
}
} catch (final SQLException e) {
if (Objects.isNull(e.getCause()) || !(e.getCause() instanceof SQLException)) {
throw new ConnectionErrorException(e.getSQLState(), e.getErrorCode(), e.getMessage(), e);
Expand All @@ -113,6 +150,19 @@ public static void attemptSQLCreateAndDropTableOperations(final String outputSch
}
}

/**
* Generates a dummy AirbyteRecordMessage with random values.
*
* @return AirbyteRecordMessage object with dummy values that may be used to test insert permission.
*/
private static AirbyteRecordMessage getDummyRecord() {
final JsonNode dummyDataToInsert = Jsons.deserialize("{ \"field1\": true }");
return new AirbyteRecordMessage()
.withStream("stream1")
.withData(dummyDataToInsert)
.withEmittedAt(1602637589000L);
}

protected DataSource getDataSource(final JsonNode config) {
final JsonNode jdbcConfig = toJdbcConfig(config);
return DataSourceFactory.create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.airbyte.integrations.base.AirbyteTraceMessageUtility;
import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.destination.NamingConventionTransformer;
import io.airbyte.integrations.destination.jdbc.AbstractJdbcDestination;
import io.airbyte.integrations.destination.jdbc.SqlOperations;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
Expand Down Expand Up @@ -68,7 +69,7 @@ public AirbyteConnectionStatus check(final JsonNode config) {
final JdbcDatabase database = getDatabase(dataSource);
final var nameTransformer = getNameTransformer();
final var outputSchema = nameTransformer.convertStreamName(config.get(schemaFieldName).asText());
AbstractJdbcDestination.attemptSQLCreateAndDropTableOperations(outputSchema, database, nameTransformer, getSqlOperations());
performCreateInsertTestOnDestination(outputSchema, database, nameTransformer);

return new AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.SUCCEEDED);
} catch (final ConnectionErrorException ex) {
Expand All @@ -92,4 +93,11 @@ public AirbyteConnectionStatus check(final JsonNode config) {
}
}

protected void performCreateInsertTestOnDestination(final String outputSchema,
final JdbcDatabase database,
final NamingConventionTransformer nameTransformer)
throws Exception {
AbstractJdbcDestination.attemptSQLCreateAndDropTableOperations(outputSchema, database, nameTransformer, getSqlOperations());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@ RUN tar xf ${APPLICATION}.tar --strip-components=1

ENV ENABLE_SENTRY true

LABEL io.airbyte.version=0.4.38
LABEL io.airbyte.version=0.4.39
LABEL io.airbyte.name=airbyte/destination-snowflake
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ dependencies {
// this is a configuration to make mockito work with final classes
testImplementation 'org.mockito:mockito-inline:2.13.0'


integrationTestJavaImplementation project(':airbyte-commons-worker')
integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test')
integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-snowflake')
integrationTestJavaImplementation 'org.apache.commons:commons-lang3:3.11'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.destination.NamingConventionTransformer;
import io.airbyte.integrations.destination.jdbc.AbstractJdbcDestination;
import io.airbyte.integrations.destination.jdbc.SqlOperations;
import io.airbyte.integrations.destination.jdbc.copy.CopyConsumerFactory;
import io.airbyte.integrations.destination.jdbc.copy.CopyDestination;
Expand Down Expand Up @@ -63,6 +65,15 @@ public SqlOperations getSqlOperations() {
return new SnowflakeSqlOperations();
}

@Override
protected void performCreateInsertTestOnDestination(final String outputSchema,
final JdbcDatabase database,
final NamingConventionTransformer nameTransformer)
throws Exception {
AbstractJdbcDestination.attemptTableOperations(outputSchema, database, nameTransformer,
getSqlOperations(), true);
}

private String getConfiguredSchema(final JsonNode config) {
return config.get("schema").asText();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ public AirbyteConnectionStatus check(final JsonNode config) {
try {
final JdbcDatabase database = getDatabase(dataSource);
final String outputSchema = super.getNamingResolver().getIdentifier(config.get("schema").asText());
attemptSQLCreateAndDropTableOperations(outputSchema, database, nameTransformer, snowflakeGcsStagingSqlOperations);

attemptTableOperations(outputSchema, database, nameTransformer, snowflakeGcsStagingSqlOperations,
true);
attemptWriteAndDeleteGcsObject(gcsConfig, outputSchema);

return new AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.SUCCEEDED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ public AirbyteConnectionStatus check(final JsonNode config) {
try {
final JdbcDatabase database = getDatabase(dataSource);
final String outputSchema = nameTransformer.getIdentifier(config.get("schema").asText());
attemptSQLCreateAndDropTableOperations(outputSchema, database, nameTransformer, snowflakeInternalStagingSqlOperations);
attemptSQLCreateAndDropStages(outputSchema, database, nameTransformer, snowflakeInternalStagingSqlOperations);
attemptTableOperations(outputSchema, database, nameTransformer,
snowflakeInternalStagingSqlOperations, true);
attemptStageOperations(outputSchema, database, nameTransformer, snowflakeInternalStagingSqlOperations);
return new AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.SUCCEEDED);
} catch (final Exception e) {
LOGGER.error("Exception while checking connection: ", e);
Expand All @@ -63,10 +64,10 @@ public AirbyteConnectionStatus check(final JsonNode config) {
}
}

private static void attemptSQLCreateAndDropStages(final String outputSchema,
final JdbcDatabase database,
final NamingConventionTransformer namingResolver,
final SnowflakeInternalStagingSqlOperations sqlOperations)
private static void attemptStageOperations(final String outputSchema,
final JdbcDatabase database,
final NamingConventionTransformer namingResolver,
final SnowflakeInternalStagingSqlOperations sqlOperations)
throws Exception {

// verify we have permissions to create/drop stage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,9 @@ public AirbyteConnectionStatus check(final JsonNode config) {
try {
final JdbcDatabase database = getDatabase(dataSource);
final String outputSchema = super.getNamingResolver().getIdentifier(config.get("schema").asText());
attemptSQLCreateAndDropTableOperations(outputSchema, database, nameTransformer, snowflakeS3StagingSqlOperations);
attemptSQLCreateAndDropStages(outputSchema, database, nameTransformer, snowflakeS3StagingSqlOperations);
attemptTableOperations(outputSchema, database, nameTransformer, snowflakeS3StagingSqlOperations,
true);
attemptStageOperations(outputSchema, database, nameTransformer, snowflakeS3StagingSqlOperations);
return new AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.SUCCEEDED);
} catch (final Exception e) {
LOGGER.error("Exception while checking connection: ", e);
Expand All @@ -77,7 +78,7 @@ public AirbyteConnectionStatus check(final JsonNode config) {
}
}

private static void attemptSQLCreateAndDropStages(final String outputSchema,
private static void attemptStageOperations(final String outputSchema,
final JdbcDatabase database,
final NamingConventionTransformer namingResolver,
final SnowflakeS3StagingSqlOperations sqlOperations)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.integrations.destination.snowflake;

import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

Expand All @@ -14,6 +15,7 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.commons.string.Strings;
import io.airbyte.config.StandardCheckConnectionOutput;
import io.airbyte.config.StandardCheckConnectionOutput.Status;
import io.airbyte.db.factory.DataSourceFactory;
import io.airbyte.db.jdbc.JdbcDatabase;
Expand Down Expand Up @@ -41,6 +43,11 @@
public class SnowflakeInsertDestinationAcceptanceTest extends DestinationAcceptanceTest {

private static final NamingConventionTransformer NAME_TRANSFORMER = new SnowflakeSQLNameTransformer();
protected static final String NO_ACTIVE_WAREHOUSE_ERR_MSG =
"No active warehouse selected in the current session. Select an active warehouse with the 'use warehouse' command.";

protected static final String NO_USER_PRIVILEGES_ERR_MSG =
"Schema 'TEXT_SCHEMA' already exists, but current role has no privileges on it.";

// this config is based on the static config, and it contains a random
// schema name that is different for each test run
Expand Down Expand Up @@ -178,6 +185,30 @@ protected void tearDown(final TestDestinationEnv testEnv) throws Exception {
DataSourceFactory.close(dataSource);
}

@Test
public void testCheckWithNoActiveWarehouseConnection() throws Exception {
// Config to user(creds) that has no warehouse assigned
final JsonNode config = Jsons.deserialize(IOs.readFile(
Path.of("secrets/internal_staging_config_no_active_warehouse.json")));

StandardCheckConnectionOutput standardCheckConnectionOutput = runCheck(config);

assertEquals(Status.FAILED, standardCheckConnectionOutput.getStatus());
assertThat(standardCheckConnectionOutput.getMessage()).contains(NO_ACTIVE_WAREHOUSE_ERR_MSG);
}

@Test
public void testCheckWithNoTextSchemaPermissionConnection() throws Exception {
// Config to user (creds) that has no permission to schema
final JsonNode config = Jsons.deserialize(IOs.readFile(
Path.of("secrets/config_no_text_schema_permission.json")));

StandardCheckConnectionOutput standardCheckConnectionOutput = runCheck(config);

assertEquals(Status.FAILED, standardCheckConnectionOutput.getStatus());
assertThat(standardCheckConnectionOutput.getMessage()).contains(NO_USER_PRIVILEGES_ERR_MSG);
}

@Test
public void testBackwardCompatibilityAfterAddingOauth() {
final JsonNode deprecatedStyleConfig = Jsons.clone(config);
Expand Down
5 changes: 3 additions & 2 deletions docs/integrations/destinations/snowflake.md
Original file line number Diff line number Diff line change
Expand Up @@ -275,8 +275,9 @@ Now that you have set up the Snowflake destination connector, check out the foll

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.4.38 | 2022-09-26 | [\#17115](https://github.com/airbytehq/airbyte/pull/17115) | Added connection string identifier |
| 0.4.37 | 2022-09-21 | [\#16839](https://github.com/airbytehq/airbyte/pull/16839) | Update JDBC driver for Snowflake to 3.13.19 |
| 0.4.39 | 2022-11-09 | [\#18970](https://github.com/airbytehq/airbyte/pull/18970) | Updated "check" connection method to handle more errors |
| 0.4.38 | 2022-09-26 | [\#17115](https://github.com/airbytehq/airbyte/pull/17115) | Added connection string identifier |
| 0.4.37 | 2022-09-21 | [\#16839](https://github.com/airbytehq/airbyte/pull/16839) | Update JDBC driver for Snowflake to 3.13.19 |
| 0.4.36 | 2022-09-14 | [\#15668](https://github.com/airbytehq/airbyte/pull/15668) | Wrap logs in AirbyteLogMessage |
| 0.4.35 | 2022-09-01 | [\#16243](https://github.com/airbytehq/airbyte/pull/16243) | Fix Json to Avro conversion when there is field name clash from combined restrictions (`anyOf`, `oneOf`, `allOf` fields). |
| 0.4.34 | 2022-07-23 | [\#14388](https://github.com/airbytehq/airbyte/pull/14388) | Add support for key pair authentication |
Expand Down

0 comments on commit b5c1955

Please sign in to comment.