Skip to content

Commit

Permalink
check for permissions error on snowflake destinations (#21764)
Browse files Browse the repository at this point in the history
* check for permissions error on snowflake destinations

* update tests to use asset throws, make config error exception message clearer

* update docs

* change error message test

* the right write

* catch known permissions errors and rethrow as config errors

* auto-bump connector version

Co-authored-by: Octavia Squidington III <[email protected]>
  • Loading branch information
jbfbell and octavia-squidington-iii authored Jan 25, 2023
1 parent 392de0e commit 3d92cb6
Show file tree
Hide file tree
Showing 9 changed files with 95 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@
- name: Snowflake
destinationDefinitionId: 424892c4-daac-4491-b35d-c6688ba547ba
dockerRepository: airbyte/destination-snowflake
dockerImageTag: 0.4.44
dockerImageTag: 0.4.45
documentationUrl: https://docs.airbyte.com/integrations/destinations/snowflake
icon: snowflake.svg
normalizationConfig:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6109,7 +6109,7 @@
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-snowflake:0.4.44"
- dockerImage: "airbyte/destination-snowflake:0.4.45"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/destinations/snowflake"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.integrations.destination.jdbc;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.exceptions.ConfigErrorException;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.base.JavaBaseConstants;
Expand Down Expand Up @@ -43,12 +44,27 @@ protected JdbcSqlOperations(final DataAdapter dataAdapter) {

@Override
public void createSchemaIfNotExists(final JdbcDatabase database, final String schemaName) throws Exception {
if (!schemaSet.contains(schemaName) && !isSchemaExists(database, schemaName)) {
database.execute(String.format("CREATE SCHEMA IF NOT EXISTS %s;", schemaName));
schemaSet.add(schemaName);
try {
if (!schemaSet.contains(schemaName) && !isSchemaExists(database, schemaName)) {
database.execute(String.format("CREATE SCHEMA IF NOT EXISTS %s;", schemaName));
schemaSet.add(schemaName);
}
} catch (Exception e) {
throw checkForKnownConfigExceptions(e).orElseThrow(() -> e);
}
}

/**
* When an exception occurs, we may recognize it as an issue with the users permissions
* or other configuration options. In these cases, we can wrap the exception in a {@link ConfigErrorException}
* which will exclude the error from our on-call paging/reporting
* @param e the exception to check.
* @return A ConfigErrorException with a message with actionable feedback to the user.
*/
protected Optional<ConfigErrorException> checkForKnownConfigExceptions(Exception e) {
return Optional.empty();
}

@Override
public void createTableIfNotExists(final JdbcDatabase database, final String schemaName, final String tableName) throws SQLException {
database.execute(createTableQuery(database, schemaName, tableName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,12 @@

import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.protocol.models.v0.AirbyteRecordMessage;
import java.sql.SQLException;
import java.util.List;
import org.junit.Assert;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

public class TestJdbcSqlOperations extends JdbcSqlOperations {

Expand All @@ -19,4 +24,18 @@ public void insertRecordsInternal(final JdbcDatabase database,
// Not required for the testing
}

@Test
public void testCreateSchemaIfNotExists() {
final JdbcDatabase db = Mockito.mock(JdbcDatabase.class);
final var schemaName = "foo";
try {
Mockito.doThrow(new SQLException("TEST")).when(db).execute(Mockito.anyString());
} catch (Exception e) {
// This would not be expected, but the `execute` method above will flag as an unhandled exception
assert false;
}
SQLException exception = Assertions.assertThrows(SQLException.class, () -> createSchemaIfNotExists(db, schemaName));
Assertions.assertEquals(exception.getMessage(), "TEST");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@ RUN tar xf ${APPLICATION}.tar --strip-components=1

ENV ENABLE_SENTRY true

LABEL io.airbyte.version=0.4.44
LABEL io.airbyte.version=0.4.45
LABEL io.airbyte.name=airbyte/destination-snowflake
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.integrations.destination.snowflake;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.exceptions.ConfigErrorException;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.jdbc.JdbcSqlOperations;
Expand All @@ -13,8 +14,10 @@
import io.airbyte.protocol.models.v0.AirbyteRecordMessage;
import java.sql.SQLException;
import java.util.List;
import java.util.Optional;
import java.util.StringJoiner;
import java.util.stream.Stream;
import net.snowflake.client.jdbc.SnowflakeSQLException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -23,6 +26,10 @@ class SnowflakeSqlOperations extends JdbcSqlOperations implements SqlOperations
private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeSqlOperations.class);
private static final int MAX_FILES_IN_LOADING_QUERY_LIMIT = 1000;

// This is an unfortunately fragile way to capture this, but Snowflake doesn't
// provide a more specific permission exception error code
private static final String NO_PRIVILEGES_ERROR_MESSAGE = "but current role has no privileges on it";

@Override
public String createTableQuery(final JdbcDatabase database, final String schemaName, final String tableName) {
return String.format(
Expand Down Expand Up @@ -73,4 +80,12 @@ protected String generateFilesList(final List<String> files) {
}
}

@Override
protected Optional<ConfigErrorException> checkForKnownConfigExceptions(Exception e) {
if (e instanceof SnowflakeSQLException && e.getMessage().contains(NO_PRIVILEGES_ERROR_MESSAGE)) {
return Optional.of(new ConfigErrorException("Encountered Error with Snowflake Configuration: Current role does not have permissions on the target schema please verify your privileges", e));
}
return Optional.empty();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class SnowflakeInsertDestinationAcceptanceTest extends DestinationAccepta
"No active warehouse selected in the current session. Select an active warehouse with the 'use warehouse' command.";

protected static final String NO_USER_PRIVILEGES_ERR_MSG =
"Schema 'TEXT_SCHEMA' already exists, but current role has no privileges on it.";
"Encountered Error with Snowflake Configuration: Current role does not have permissions on the target schema please verify your privileges";

// this config is based on the static config, and it contains a random
// schema name that is different for each test run
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,20 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import io.airbyte.commons.exceptions.ConfigErrorException;
import io.airbyte.commons.functional.CheckedConsumer;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.protocol.models.v0.AirbyteRecordMessage;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import net.snowflake.client.jdbc.SnowflakeSQLException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.mockito.Mockito;

class SnowflakeSqlOperationsTest {

Expand Down Expand Up @@ -53,4 +59,24 @@ void insertRecordsInternal() throws SQLException {
verify(db, times(1)).execute(any(CheckedConsumer.class));
}

@ParameterizedTest
@CsvSource({"TEST,false", "but current role has no privileges on it,true"})
public void testCreateSchemaIfNotExists(final String message, final boolean shouldCapture) {
final JdbcDatabase db = Mockito.mock(JdbcDatabase.class);
final var schemaName = "foo";
try {
Mockito.doThrow(new SnowflakeSQLException(message)).when(db).execute(Mockito.anyString());
} catch (Exception e) {
// This would not be expected, but the `execute` method above will flag as an unhandled exception
assert false;
}
Exception exception = Assertions.assertThrows(Exception.class, () -> snowflakeSqlOperations.createSchemaIfNotExists(db, schemaName));
if (shouldCapture) {
Assertions.assertInstanceOf(ConfigErrorException.class, exception);
} else {
Assertions.assertInstanceOf(SnowflakeSQLException.class, exception);
Assertions.assertEquals(exception.getMessage(), message);
}
}

}
13 changes: 12 additions & 1 deletion docs/integrations/destinations/snowflake.md
Original file line number Diff line number Diff line change
Expand Up @@ -272,12 +272,23 @@ Now that you have set up the Snowflake destination connector, check out the foll
- [Migrate your data from Redshift to Snowflake](https://airbyte.com/tutorials/redshift-to-snowflake)
- [Orchestrate ELT pipelines with Prefect, Airbyte and dbt](https://airbyte.com/tutorials/elt-pipeline-prefect-airbyte-dbt)

## Troubleshooting

### 'Current role does not have permissions on the target schema'
If you receive an error stating `Current role does not have permissions on the target schema` make sure that the
Snowflake destination `SCHEMA` is one that the role you've provided has permissions on. When creating a connection,
it may allow you to select `Mirror source structure` for the `Destination namespace`, which if you have followed
some of our default examples and tutorials may result in the connection trying to write to a `PUBLIC` schema.

A quick fix could be to edit your connection's 'Replication' settings from `Mirror source structure` to `Destination Default`.
Otherwise, make sure to grant the role the required permissions in the desired namespace.

## Changelog

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.4.44 | 2023-01-20 | [#21087](https://github.com/airbytehq/airbyte/pull/21087) | Wrap Authentication Errors as Config Exceptions |
| 0.4.45 | 2023-01-25 | [#21087](https://github.com/airbytehq/airbyte/pull/21764) | Catch Known Permissions and rethrow as ConfigExceptions |
| 0.4.44 | 2023-01-20 | [#21087](https://github.com/airbytehq/airbyte/pull/21087) | Wrap Authentication Errors as Config Exceptions |
| 0.4.43 | 2023-01-20 | [\#21450](https://github.com/airbytehq/airbyte/pull/21450) | Updated Check methods to handle more possible s3 and gcs stagings issues |
| 0.4.42 | 2023-01-12 | [\#21342](https://github.com/airbytehq/airbyte/pull/21342) | Better handling for conflicting destination streams |
| 0.4.41 | 2022-12-16 | [\#20566](https://github.com/airbytehq/airbyte/pull/20566) | Improve spec to adhere to standards |
Expand Down

0 comments on commit 3d92cb6

Please sign in to comment.