Skip to content

Commit

Permalink
🎉 Destination Snowflake: Add option to stage encrypted files via S3; …
Browse files Browse the repository at this point in the history
…fix purge_staging_data (#12452)
  • Loading branch information
edgao authored May 4, 2022
1 parent cc24c5d commit 43470a2
Show file tree
Hide file tree
Showing 27 changed files with 736 additions and 126 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@
- name: Snowflake
destinationDefinitionId: 424892c4-daac-4491-b35d-c6688ba547ba
dockerRepository: airbyte/destination-snowflake
dockerImageTag: 0.4.24
dockerImageTag: 0.4.25
documentationUrl: https://docs.airbyte.io/integrations/destinations/snowflake
icon: snowflake.svg
resourceRequirements:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4076,7 +4076,7 @@
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-snowflake:0.4.24"
- dockerImage: "airbyte/destination-snowflake:0.4.25"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/snowflake"
connectionSpecification:
Expand Down Expand Up @@ -4143,6 +4143,7 @@
order: 5
credentials:
title: "Authorization Method"
description: ""
type: "object"
oneOf:
- type: "object"
Expand Down Expand Up @@ -4212,6 +4213,8 @@
- "method"
properties:
method:
title: ""
description: ""
type: "string"
enum:
- "Standard"
Expand All @@ -4225,6 +4228,8 @@
- "method"
properties:
method:
title: ""
description: ""
type: "string"
enum:
- "Internal Staging"
Expand All @@ -4241,6 +4246,8 @@
- "secret_access_key"
properties:
method:
title: ""
description: ""
type: "string"
enum:
- "S3 Staging"
Expand Down Expand Up @@ -4324,6 +4331,46 @@
\ to true."
default: true
order: 6
encryption:
title: "Encryption"
type: "object"
description: "How to encrypt the staging data"
default:
encryption_type: "none"
order: 7
oneOf:
- title: "No encryption"
description: "Staging data will be stored in plaintext."
type: "object"
required:
- "encryption_type"
properties:
encryption_type:
type: "string"
const: "none"
enum:
- "none"
default: "none"
- title: "AES-CBC envelope encryption"
description: "Staging data will be encrypted using AES-CBC envelope\
\ encryption."
type: "object"
required:
- "encryption_type"
properties:
encryption_type:
type: "string"
const: "aes_cbc_envelope"
enum:
- "aes_cbc_envelope"
default: "aes_cbc_envelope"
key_encrypting_key:
type: "string"
title: "Key"
description: "The key, base64-encoded. Must be either 128, 192,\
\ or 256 bits. Leave blank to have Airbyte generate an ephemeral\
\ key for each sync."
airbyte_secret: true
- title: "GCS Staging"
additionalProperties: false
description: "Writes large batches of records to a file, uploads the file\
Expand All @@ -4336,6 +4383,8 @@
- "credentials_json"
properties:
method:
title: ""
description: ""
type: "string"
enum:
- "GCS Staging"
Expand Down Expand Up @@ -4382,6 +4431,8 @@
- "azure_blob_storage_sas_token"
properties:
method:
title: ""
description: ""
type: "string"
enum:
- "Azure Blob Staging"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
import io.airbyte.integrations.destination.NamingConventionTransformer;
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
import io.airbyte.integrations.destination.s3.S3StorageOperations;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -43,4 +45,8 @@ protected void cleanUpObjects(final String bucket, final List<KeyVersion> keysTo
}
}

@Override
protected Map<String, String> getMetadataMapping() {
return new HashMap<>();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ public AirbyteMessageConsumer create(final Consumer<AirbyteMessage> outputRecord
final NamingConventionTransformer namingResolver,
final CheckedBiFunction<AirbyteStreamNameNamespacePair, ConfiguredAirbyteCatalog, SerializableBuffer, Exception> onCreateBuffer,
final JsonNode config,
final ConfiguredAirbyteCatalog catalog) {
final ConfiguredAirbyteCatalog catalog,
final boolean purgeStagingData) {
final List<WriteConfig> writeConfigs = createWriteConfigs(namingResolver, config, catalog);
return new BufferedStreamConsumer(
outputRecordCollector,
Expand All @@ -68,7 +69,7 @@ public AirbyteMessageConsumer create(final Consumer<AirbyteMessage> outputRecord
onCreateBuffer,
catalog,
flushBufferFunction(database, stagingOperations, writeConfigs, catalog)),
onCloseFunction(database, stagingOperations, writeConfigs),
onCloseFunction(database, stagingOperations, writeConfigs, purgeStagingData),
catalog,
stagingOperations::isValidData);
}
Expand Down Expand Up @@ -177,7 +178,8 @@ private CheckedBiConsumer<AirbyteStreamNameNamespacePair, SerializableBuffer, Ex

private OnCloseFunction onCloseFunction(final JdbcDatabase database,
final StagingOperations stagingOperations,
final List<WriteConfig> writeConfigs) {
final List<WriteConfig> writeConfigs,
final boolean purgeStagingData) {
return (hasFailed) -> {
if (!hasFailed) {
final List<String> queryList = new ArrayList<>();
Expand Down Expand Up @@ -224,10 +226,12 @@ private OnCloseFunction onCloseFunction(final JdbcDatabase database,
tmpTableName);

stagingOperations.dropTableIfExists(database, schemaName, tmpTableName);
final String stageName = stagingOperations.getStageName(schemaName, writeConfig.getStreamName());
LOGGER.info("Cleaning stage in destination started for stream {}. schema {}, stage: {}", writeConfig.getStreamName(), schemaName,
stageName);
stagingOperations.dropStageIfExists(database, stageName);
if (purgeStagingData) {
final String stageName = stagingOperations.getStageName(schemaName, writeConfig.getStreamName());
LOGGER.info("Cleaning stage in destination started for stream {}. schema {}, stage: {}", writeConfig.getStreamName(), schemaName,
stageName);
stagingOperations.dropStageIfExists(database, stageName);
}
}
LOGGER.info("Cleaning up destination completed.");
};
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package io.airbyte.integrations.destination.s3;

import com.fasterxml.jackson.databind.JsonNode;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import javax.annotation.Nonnull;
import javax.crypto.KeyGenerator;
import org.apache.commons.lang3.StringUtils;

/**
* @param key The key to use for encryption.
* @param keyType Where the key came from.
*/
public record AesCbcEnvelopeEncryption(@Nonnull byte[] key, @Nonnull KeyType keyType) implements EncryptionConfig {

public enum KeyType {
EPHEMERAL,
USER_PROVIDED
}

public static AesCbcEnvelopeEncryption fromJson(final JsonNode encryptionNode) {
if (!encryptionNode.has("key_encrypting_key")) {
return encryptionWithRandomKey();
}
final String kek = encryptionNode.get("key_encrypting_key").asText();
if (StringUtils.isEmpty(kek)) {
return encryptionWithRandomKey();
} else {
return new AesCbcEnvelopeEncryption(BASE64_DECODER.decode(kek), KeyType.USER_PROVIDED);
}
}

private static AesCbcEnvelopeEncryption encryptionWithRandomKey() {
try {
final KeyGenerator kekGenerator = KeyGenerator.getInstance(AesCbcEnvelopeEncryptionBlobDecorator.KEY_ENCRYPTING_ALGO);
kekGenerator.init(AesCbcEnvelopeEncryptionBlobDecorator.AES_KEY_SIZE_BITS);
return new AesCbcEnvelopeEncryption(kekGenerator.generateKey().getEncoded(), KeyType.EPHEMERAL);
} catch (final NoSuchAlgorithmException e) {
throw new RuntimeException(e);
}
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

final AesCbcEnvelopeEncryption that = (AesCbcEnvelopeEncryption) o;

if (!Arrays.equals(key, that.key)) {
return false;
}
return keyType == that.keyType;
}

@Override
public int hashCode() {
int result = Arrays.hashCode(key);
result = 31 * result + keyType.hashCode();
return result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package io.airbyte.integrations.destination.s3;

import com.google.common.annotations.VisibleForTesting;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.OutputStream;
import java.security.InvalidAlgorithmParameterException;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.util.Base64;
import java.util.Base64.Encoder;
import java.util.Map;
import javax.crypto.BadPaddingException;
import javax.crypto.Cipher;
import javax.crypto.CipherOutputStream;
import javax.crypto.IllegalBlockSizeException;
import javax.crypto.KeyGenerator;
import javax.crypto.NoSuchPaddingException;
import javax.crypto.SecretKey;
import javax.crypto.spec.IvParameterSpec;
import javax.crypto.spec.SecretKeySpec;

/**
* This class implements the envelope encryption that Redshift and Snowflake use when loading encrypted files from S3 (or other blob stores):
* <ul>
* <li>A content-encrypting-key (CEK) is used to encrypt the actual data (i.e. the CSV file)</li>
* <li>A key-encrypting-key (KEK) is used to encrypt the CEK</li>
* <li>The encrypted CEK is stored in the S3 object metadata, along with the plaintext initialization vector</li>
* <li>The COPY command includes the KEK (in plaintext). Redshift/Snowflake will use it to decrypt the CEK, which it then uses to decrypt the CSV file.</li>
* </ul>
* <p>
* A new CEK is generated for each S3 object, but each sync uses a single KEK. The KEK may be either user-provided (if the user wants to keep the data
* for further use), or generated per-sync (if they simply want to add additional security around their COPY operation).
* <p>
* Redshift does not support loading directly from GCS or Azure Blob Storage.
* <p>
* Snowflake only supports client-side encryption in S3 and Azure Storage; it does not support this feature in GCS (https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html).
* Azure Storage uses a similar envelope encryption technique to S3 (https://docs.microsoft.com/en-us/azure/storage/common/storage-client-side-encryption?tabs=dotnet#encryption-via-the-envelope-technique).
*/
public class AesCbcEnvelopeEncryptionBlobDecorator implements BlobDecorator {

public static final String ENCRYPTED_CONTENT_ENCRYPTING_KEY = "aes_cbc_envelope_encryption-content-encrypting-key";
public static final String INITIALIZATION_VECTOR = "aes_cbc_envelope_encryption-initialization-vector";

public static final int AES_KEY_SIZE_BITS = 256;
private static final int AES_CBC_INITIALIZATION_VECTOR_SIZE_BYTES = 16;
private static final Encoder BASE64_ENCODER = Base64.getEncoder();
private static final SecureRandom SECURE_RANDOM = new SecureRandom();

public static final String KEY_ENCRYPTING_ALGO = "AES";

// There's no specific KeyGenerator for AES/CBC/PKCS5Padding, so we just use a normal AES KeyGenerator
private static final String CONTENT_ENCRYPTING_KEY_ALGO = "AES";
// Redshift's UNLOAD command uses this cipher mode, so we'll use it here as well.
// TODO If we eventually want to expose client-side encryption in destination-s3, we should probably also implement
// AES-GCM, since it's mostly superior to CBC mode. (if we do that: make sure that the output is compatible with
// aws-java-sdk's AmazonS3EncryptionV2Client, which requires a slightly different set of metadata)
private static final String CONTENT_ENCRYPTING_CIPHER_ALGO = "AES/CBC/PKCS5Padding";

// The real "secret key". Should be handled with great care.
private final SecretKey keyEncryptingKey;
// A random key generated for each file. Also should be handled with care.
private final SecretKey contentEncryptingKey;
// Arbitrary bytes required by the CBC algorithm. Not a sensitive value.
// The only requirement is that we never reuse an (IV, CEK) pair.
private final byte[] initializationVector;

public AesCbcEnvelopeEncryptionBlobDecorator(final SecretKey keyEncryptingKey) {
this(keyEncryptingKey, randomContentEncryptingKey(), randomInitializationVector());
}

public AesCbcEnvelopeEncryptionBlobDecorator(final byte[] keyEncryptingKey) {
this(new SecretKeySpec(keyEncryptingKey, KEY_ENCRYPTING_ALGO));
}

@VisibleForTesting
AesCbcEnvelopeEncryptionBlobDecorator(final SecretKey keyEncryptingKey, final SecretKey contentEncryptingKey, final byte[] initializationVector) {
this.keyEncryptingKey = keyEncryptingKey;
this.contentEncryptingKey = contentEncryptingKey;

this.initializationVector = initializationVector;
}

@SuppressFBWarnings(
value = {"PADORA", "CIPINT"},
justification = "We're using this cipher for compatibility with Redshift/Snowflake."
)
@Override
public OutputStream wrap(final OutputStream stream) {
try {
final Cipher dataCipher = Cipher.getInstance(CONTENT_ENCRYPTING_CIPHER_ALGO);
dataCipher.init(Cipher.ENCRYPT_MODE, contentEncryptingKey, new IvParameterSpec(initializationVector));
return new CipherOutputStream(stream, dataCipher);
} catch (final InvalidAlgorithmParameterException | NoSuchPaddingException | NoSuchAlgorithmException | InvalidKeyException e) {
throw new RuntimeException(e);
}
}

@SuppressFBWarnings(
value = {"CIPINT", "SECECB"},
justification = "We're using this cipher for compatibility with Redshift/Snowflake."
)
@Override
public void updateMetadata(final Map<String, String> metadata, final Map<String, String> metadataKeyMapping) {
try {
final Cipher keyCipher = Cipher.getInstance(KEY_ENCRYPTING_ALGO);
keyCipher.init(Cipher.ENCRYPT_MODE, keyEncryptingKey);
final byte[] encryptedCekBytes = keyCipher.doFinal(contentEncryptingKey.getEncoded());

BlobDecorator.insertMetadata(metadata, metadataKeyMapping, ENCRYPTED_CONTENT_ENCRYPTING_KEY, BASE64_ENCODER.encodeToString(encryptedCekBytes));
BlobDecorator.insertMetadata(metadata, metadataKeyMapping, INITIALIZATION_VECTOR, BASE64_ENCODER.encodeToString(initializationVector));
} catch (final NoSuchPaddingException | NoSuchAlgorithmException | InvalidKeyException | IllegalBlockSizeException | BadPaddingException e) {
throw new RuntimeException(e);
}
}

private static SecretKey randomContentEncryptingKey() {
try {
final KeyGenerator cekGenerator = KeyGenerator.getInstance(CONTENT_ENCRYPTING_KEY_ALGO);
cekGenerator.init(AES_KEY_SIZE_BITS);
return cekGenerator.generateKey();
} catch (final NoSuchAlgorithmException e) {
throw new RuntimeException(e);
}
}

private static byte[] randomInitializationVector() {
final byte[] initializationVector = new byte[AES_CBC_INITIALIZATION_VECTOR_SIZE_BYTES];
SECURE_RANDOM.nextBytes(initializationVector);
return initializationVector;
}
}
Loading

0 comments on commit 43470a2

Please sign in to comment.