diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index 39285b789ebe..d84582706ba2 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -348,7 +348,7 @@ - name: Snowflake destinationDefinitionId: 424892c4-daac-4491-b35d-c6688ba547ba dockerRepository: airbyte/destination-snowflake - dockerImageTag: 0.4.42 + dockerImageTag: 0.4.43 documentationUrl: https://docs.airbyte.com/integrations/destinations/snowflake icon: snowflake.svg normalizationConfig: diff --git a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml index 859ec2b6d8ab..aa768b304061 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml @@ -6109,7 +6109,7 @@ supported_destination_sync_modes: - "overwrite" - "append" -- dockerImage: "airbyte/destination-snowflake:0.4.42" +- dockerImage: "airbyte/destination-snowflake:0.4.43" spec: documentationUrl: "https://docs.airbyte.com/integrations/destinations/snowflake" connectionSpecification: diff --git a/airbyte-integrations/bases/base-java-s3/src/main/java/io/airbyte/integrations/destination/s3/csv/CsvSerializedBuffer.java b/airbyte-integrations/bases/base-java-s3/src/main/java/io/airbyte/integrations/destination/s3/csv/CsvSerializedBuffer.java index 0dbb695e5b01..f16b454c745e 100644 --- a/airbyte-integrations/bases/base-java-s3/src/main/java/io/airbyte/integrations/destination/s3/csv/CsvSerializedBuffer.java +++ b/airbyte-integrations/bases/base-java-s3/src/main/java/io/airbyte/integrations/destination/s3/csv/CsvSerializedBuffer.java @@ -31,9 +31,9 @@ public class CsvSerializedBuffer extends BaseSerializedBuffer { private CSVPrinter csvPrinter; private CSVFormat csvFormat; - protected CsvSerializedBuffer(final BufferStorage bufferStorage, - final CsvSheetGenerator csvSheetGenerator, - final boolean compression) + public CsvSerializedBuffer(final BufferStorage bufferStorage, + final CsvSheetGenerator csvSheetGenerator, + final boolean compression) throws Exception { super(bufferStorage); this.csvSheetGenerator = csvSheetGenerator; diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/CopyDestination.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/CopyDestination.java index 51ba72727648..85d5c733bd67 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/CopyDestination.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/CopyDestination.java @@ -97,7 +97,7 @@ protected void performCreateInsertTestOnDestination(final String outputSchema, final JdbcDatabase database, final NamingConventionTransformer nameTransformer) throws Exception { - AbstractJdbcDestination.attemptSQLCreateAndDropTableOperations(outputSchema, database, nameTransformer, getSqlOperations()); + AbstractJdbcDestination.attemptTableOperations(outputSchema, database, nameTransformer, getSqlOperations(), true); } } diff --git a/airbyte-integrations/connectors/destination-snowflake/Dockerfile b/airbyte-integrations/connectors/destination-snowflake/Dockerfile index fca0e9654d41..4b8f627b28ea 100644 --- a/airbyte-integrations/connectors/destination-snowflake/Dockerfile +++ b/airbyte-integrations/connectors/destination-snowflake/Dockerfile @@ -20,5 +20,5 @@ RUN tar xf ${APPLICATION}.tar --strip-components=1 ENV ENABLE_SENTRY true -LABEL io.airbyte.version=0.4.42 +LABEL io.airbyte.version=0.4.43 LABEL io.airbyte.name=airbyte/destination-snowflake diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeGcsStagingDestination.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeGcsStagingDestination.java index d451dab54d52..ac3fdfc832d7 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeGcsStagingDestination.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeGcsStagingDestination.java @@ -5,9 +5,11 @@ package io.airbyte.integrations.destination.snowflake; import static io.airbyte.integrations.destination.snowflake.SnowflakeS3StagingDestination.isPurgeStagingData; +import static java.nio.charset.StandardCharsets.UTF_8; import com.fasterxml.jackson.databind.JsonNode; import com.google.auth.oauth2.GoogleCredentials; +import com.google.cloud.WriteChannel; import com.google.cloud.storage.BlobId; import com.google.cloud.storage.BlobInfo; import com.google.cloud.storage.Storage; @@ -29,6 +31,7 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; +import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.Map; @@ -83,12 +86,19 @@ public AirbyteConnectionStatus check(final JsonNode config) { } private static void attemptWriteAndDeleteGcsObject(final GcsConfig gcsConfig, final String outputTableName) throws IOException { - final var storage = getStorageClient(gcsConfig); - final var blobId = BlobId.of(gcsConfig.getBucketName(), "check-content/" + outputTableName); - final var blobInfo = BlobInfo.newBuilder(blobId).build(); + final Storage storageClient = getStorageClient(gcsConfig); + final BlobId blobId = BlobId.of(gcsConfig.getBucketName(), "check-content/" + outputTableName); + final BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setContentType("text/plain").build(); - storage.create(blobInfo, "".getBytes(StandardCharsets.UTF_8)); - storage.delete(blobId); + storageClient.create(blobInfo); + + try (WriteChannel writer = storageClient.writer(blobInfo)) { + // Try to write a dummy message to make sure user has all required permissions + final byte[] content = "Hello, World!".getBytes(UTF_8); + writer.write(ByteBuffer.wrap(content, 0, content.length)); + } finally { + storageClient.delete(blobId); + } } public static Storage getStorageClient(final GcsConfig gcsConfig) throws IOException { diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingSqlOperations.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingSqlOperations.java index e1765ecbd5bf..2f917fe7d474 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingSqlOperations.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingSqlOperations.java @@ -134,7 +134,8 @@ public void createStageIfNotExists(final JdbcDatabase database, final String sta } /** - * Creates a SQL query to create a staging folder. This query will create a staging folder if one previously did not exist + * Creates a SQL query to create a staging folder. This query will create a staging folder if one + * previously did not exist * * @param stageName name of the staging folder * @return SQL query string @@ -157,8 +158,8 @@ public void copyIntoTmpTableFromStage(final JdbcDatabase database, } /** - * Creates a SQL query to bulk copy data into fully qualified destination table - * See https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html for more context + * Creates a SQL query to bulk copy data into fully qualified destination table See + * https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html for more context * * @param stageName name of staging folder * @param stagingPath path of staging folder to data files @@ -200,8 +201,8 @@ public void cleanUpStage(final JdbcDatabase database, final String stageName, fi } /** - * Creates a SQL query used to remove staging files that were just staged - * See https://docs.snowflake.com/en/sql-reference/sql/remove.html for more context + * Creates a SQL query used to remove staging files that were just staged See + * https://docs.snowflake.com/en/sql-reference/sql/remove.html for more context * * @param stageName name of staging folder * @return SQL query string diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StagingDestination.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StagingDestination.java index ead94c94ba9e..bf1f5c220c3c 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StagingDestination.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StagingDestination.java @@ -13,15 +13,18 @@ import io.airbyte.integrations.destination.NamingConventionTransformer; import io.airbyte.integrations.destination.jdbc.AbstractJdbcDestination; import io.airbyte.integrations.destination.record_buffer.FileBuffer; +import io.airbyte.integrations.destination.record_buffer.InMemoryBuffer; import io.airbyte.integrations.destination.s3.AesCbcEnvelopeEncryption; import io.airbyte.integrations.destination.s3.AesCbcEnvelopeEncryption.KeyType; import io.airbyte.integrations.destination.s3.EncryptionConfig; import io.airbyte.integrations.destination.s3.S3DestinationConfig; import io.airbyte.integrations.destination.s3.csv.CsvSerializedBuffer; +import io.airbyte.integrations.destination.s3.csv.StagingDatabaseCsvSheetGenerator; import io.airbyte.integrations.destination.staging.StagingConsumerFactory; import io.airbyte.protocol.models.v0.AirbyteConnectionStatus; import io.airbyte.protocol.models.v0.AirbyteConnectionStatus.Status; import io.airbyte.protocol.models.v0.AirbyteMessage; +import io.airbyte.protocol.models.v0.AirbyteRecordMessage; import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; import java.util.Collections; import java.util.Map; @@ -90,7 +93,26 @@ private static void attemptStageOperations(final String outputSchema, final String outputTableName = namingResolver.getIdentifier("_airbyte_connection_test_" + UUID.randomUUID()); final String stageName = sqlOperations.getStageName(outputSchema, outputTableName); sqlOperations.createStageIfNotExists(database, stageName); - sqlOperations.dropStageIfExists(database, stageName); + + // try to make test write to make sure we have required role + try { + final CsvSerializedBuffer csvSerializedBuffer = new CsvSerializedBuffer( + new InMemoryBuffer(".csv"), + new StagingDatabaseCsvSheetGenerator(), + true); + + // create a dummy stream\records that will bed used to test uploading + csvSerializedBuffer.accept(new AirbyteRecordMessage() + .withData(Jsons.jsonNode(Map.of("testKey", "testValue"))) + .withEmittedAt(System.currentTimeMillis())); + csvSerializedBuffer.flush(); + + sqlOperations.uploadRecordsToStage(database, csvSerializedBuffer, outputSchema, stageName, + stageName.endsWith("/") ? stageName : stageName + "/"); + } finally { + // drop created tmp stage + sqlOperations.dropStageIfExists(database, stageName); + } } @Override diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeGcsCopyDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeGcsCopyDestinationAcceptanceTest.java index 1e8e5863d5db..eb85788f6c64 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeGcsCopyDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeGcsCopyDestinationAcceptanceTest.java @@ -4,14 +4,23 @@ package io.airbyte.integrations.destination.snowflake; +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; + import com.fasterxml.jackson.databind.JsonNode; import com.google.common.base.Preconditions; import io.airbyte.commons.io.IOs; import io.airbyte.commons.json.Jsons; +import io.airbyte.config.StandardCheckConnectionOutput; +import io.airbyte.config.StandardCheckConnectionOutput.Status; import java.nio.file.Path; +import org.junit.jupiter.api.Test; public class SnowflakeGcsCopyDestinationAcceptanceTest extends SnowflakeInsertDestinationAcceptanceTest { + private static final String NO_GCS_PRIVILEGES_ERR_MSG = + "Permission 'storage.objects.create' denied on resource (or it may not exist)."; + @Override public JsonNode getStaticConfig() { final JsonNode copyConfig = Jsons.deserialize(IOs.readFile(Path.of("secrets/copy_gcs_config.json"))); @@ -20,4 +29,16 @@ public JsonNode getStaticConfig() { return copyConfig; } + @Test + public void testCheckWithNoProperGcsPermissionConnection() throws Exception { + // Config to user (creds) that has no permission to schema + final JsonNode config = Jsons.deserialize(IOs.readFile( + Path.of("secrets/copy_insufficient_gcs_roles_config.json"))); + + StandardCheckConnectionOutput standardCheckConnectionOutput = runCheck(config); + + assertEquals(Status.FAILED, standardCheckConnectionOutput.getStatus()); + assertThat(standardCheckConnectionOutput.getMessage()).contains(NO_GCS_PRIVILEGES_ERR_MSG); + } + } diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3CopyEncryptedDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3CopyEncryptedDestinationAcceptanceTest.java index 2fba654e1c02..61bcb7b9f71e 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3CopyEncryptedDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3CopyEncryptedDestinationAcceptanceTest.java @@ -4,14 +4,22 @@ package io.airbyte.integrations.destination.snowflake; +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; + import com.fasterxml.jackson.databind.JsonNode; import com.google.common.base.Preconditions; import io.airbyte.commons.io.IOs; import io.airbyte.commons.json.Jsons; +import io.airbyte.config.StandardCheckConnectionOutput; +import io.airbyte.config.StandardCheckConnectionOutput.Status; import java.nio.file.Path; +import org.junit.jupiter.api.Test; public class SnowflakeS3CopyEncryptedDestinationAcceptanceTest extends SnowflakeInsertDestinationAcceptanceTest { + private static final String NO_S3_PRIVILEGES_ERR_MSG = "Could not connect with provided configuration."; + @Override public JsonNode getStaticConfig() { final JsonNode copyConfig = Jsons.deserialize(IOs.readFile(Path.of("secrets/copy_s3_encrypted_config.json"))); @@ -20,4 +28,16 @@ public JsonNode getStaticConfig() { return copyConfig; } + @Test + public void testCheckWithNoProperS3PermissionConnection() throws Exception { + // Config to user (creds) that has no permission to schema + final JsonNode config = Jsons.deserialize(IOs.readFile( + Path.of("secrets/copy_s3_wrong_location_config.json"))); + + StandardCheckConnectionOutput standardCheckConnectionOutput = runCheck(config); + + assertEquals(Status.FAILED, standardCheckConnectionOutput.getStatus()); + assertThat(standardCheckConnectionOutput.getMessage()).contains(NO_S3_PRIVILEGES_ERR_MSG); + } + } diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeTestSourceOperations.java b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeTestSourceOperations.java index f2958524dcdf..46ee859f3909 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeTestSourceOperations.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeTestSourceOperations.java @@ -4,6 +4,9 @@ package io.airbyte.integrations.destination.snowflake; +import static io.airbyte.db.jdbc.DateTimeConverter.putJavaSQLDate; +import static io.airbyte.db.jdbc.DateTimeConverter.putJavaSQLTime; + import com.fasterxml.jackson.databind.node.ObjectNode; import io.airbyte.db.jdbc.JdbcSourceOperations; import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTestUtils; @@ -17,4 +20,22 @@ protected void putString(ObjectNode node, String columnName, ResultSet resultSet DestinationAcceptanceTestUtils.putStringIntoJson(resultSet.getString(index), columnName, node); } + @Override + protected void putDate(final ObjectNode node, + final String columnName, + final ResultSet resultSet, + final int index) + throws SQLException { + putJavaSQLDate(node, columnName, resultSet, index); + } + + @Override + protected void putTime(final ObjectNode node, + final String columnName, + final ResultSet resultSet, + final int index) + throws SQLException { + putJavaSQLTime(node, columnName, resultSet, index); + } + } diff --git a/docs/integrations/destinations/snowflake.md b/docs/integrations/destinations/snowflake.md index 9a690157e386..b9c6ba290ce2 100644 --- a/docs/integrations/destinations/snowflake.md +++ b/docs/integrations/destinations/snowflake.md @@ -277,7 +277,8 @@ Now that you have set up the Snowflake destination connector, check out the foll | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------| -| 0.4.41 | 2023-01-12 | [\#21342](https://github.com/airbytehq/airbyte/pull/21342) | Better handling for conflicting destination streams | +| 0.4.43 | 2023-01-20 | [\#21450](https://github.com/airbytehq/airbyte/pull/21450) | Updated Check methods to handle more possible s3 and gcs stagings issues | +| 0.4.42 | 2023-01-12 | [\#21342](https://github.com/airbytehq/airbyte/pull/21342) | Better handling for conflicting destination streams | | 0.4.41 | 2022-12-16 | [\#20566](https://github.com/airbytehq/airbyte/pull/20566) | Improve spec to adhere to standards | | 0.4.40 | 2022-11-11 | [\#19302](https://github.com/airbytehq/airbyte/pull/19302) | Set jdbc application env variable depends on env - airbyte_oss or airbyte_cloud | | 0.4.39 | 2022-11-09 | [\#18970](https://github.com/airbytehq/airbyte/pull/18970) | Updated "check" connection method to handle more errors |