From 62e719471162570e5c6aaac04b6bed5f1a626608 Mon Sep 17 00:00:00 2001 From: Jeff Cowan Date: Mon, 27 Feb 2023 15:25:07 -0800 Subject: [PATCH 1/9] Redshift Destination: Add SSH Tunnelling Config Option Issue: https://github.com/airbytehq/airbyte/issues/12131 User would like to be able to access their Redshift Destination database via an SSH Tunnelling config (SSH Bastion Host), just as in Postgres, etc. destinations. Acceptance tests pass and exercise both Insert and S3 type loading options as well as password and key based authentication with the SSH bastion host. Airbyters: Credentials/configs for acceptance testing against AWS Redshift are updated in Google Cloud Secret Manager. Be sure to get both the config.json and config_staging.json as well as the new PEM file --- .../redshift/RedshiftDestination.java | 10 +- .../redshift/RedshiftInsertDestination.java | 6 + .../RedshiftStagingS3Destination.java | 5 + ...dshiftInsertDestinationAcceptanceTest.java | 29 +++ ...shiftStagingDestinationAcceptanceTest.java | 27 +++ ...RedshiftDestinationBaseAcceptanceTest.java | 204 ++++++++++++++++++ 6 files changed, 275 insertions(+), 6 deletions(-) create mode 100644 airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshKeyRedshiftInsertDestinationAcceptanceTest.java create mode 100644 airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshPasswordRedshiftStagingDestinationAcceptanceTest.java create mode 100644 airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshRedshiftDestinationBaseAcceptanceTest.java diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftDestination.java b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftDestination.java index b00d965b14bc..041221d19e16 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftDestination.java +++ b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftDestination.java @@ -27,17 +27,16 @@ public class RedshiftDestination extends SwitchingDestination { private static final Logger LOGGER = LoggerFactory.getLogger(RedshiftDestination.class); - private static final String METHOD = "method"; - - private static final Map destinationMap = Map.of( - DestinationType.STANDARD, new RedshiftInsertDestination(), - DestinationType.COPY_S3, new RedshiftStagingS3Destination()); enum DestinationType { STANDARD, COPY_S3 } + private static final Map destinationMap = Map.of( + DestinationType.STANDARD, RedshiftInsertDestination.sshWrappedDestination(), + DestinationType.COPY_S3, RedshiftStagingS3Destination.sshWrappedDestination()); + public RedshiftDestination() { super(DestinationType.class, RedshiftDestination::getTypeFromConfig, destinationMap); } @@ -47,7 +46,6 @@ private static DestinationType getTypeFromConfig(final JsonNode config) { } public static DestinationType determineUploadMode(final JsonNode config) { - final JsonNode jsonNode = findS3Options(config); if (anyOfS3FieldsAreNullOrEmpty(jsonNode)) { diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftInsertDestination.java b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftInsertDestination.java index 945bc5bb1663..08b9e21f3e27 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftInsertDestination.java +++ b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftInsertDestination.java @@ -13,6 +13,8 @@ import io.airbyte.db.jdbc.DefaultJdbcDatabase; import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.db.jdbc.JdbcUtils; +import io.airbyte.integrations.base.Destination; +import io.airbyte.integrations.base.ssh.SshWrappedDestination; import io.airbyte.integrations.destination.jdbc.AbstractJdbcDestination; import io.airbyte.integrations.destination.redshift.operations.RedshiftSqlOperations; import java.util.Map; @@ -26,6 +28,10 @@ public class RedshiftInsertDestination extends AbstractJdbcDestination { JdbcUtils.SSL_KEY, "true", "sslfactory", "com.amazon.redshift.ssl.NonValidatingFactory"); + public static Destination sshWrappedDestination() { + return new SshWrappedDestination(new RedshiftInsertDestination(), JdbcUtils.HOST_LIST_KEY, JdbcUtils.PORT_LIST_KEY); + } + public RedshiftInsertDestination() { super(DRIVER_CLASS, new RedshiftSQLNameTransformer(), new RedshiftSqlOperations()); } diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3Destination.java b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3Destination.java index 3e6f8dfe7d38..40d93ad028e0 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3Destination.java +++ b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3Destination.java @@ -22,6 +22,7 @@ import io.airbyte.integrations.base.AirbyteMessageConsumer; import io.airbyte.integrations.base.AirbyteTraceMessageUtility; import io.airbyte.integrations.base.Destination; +import io.airbyte.integrations.base.ssh.SshWrappedDestination; import io.airbyte.integrations.destination.NamingConventionTransformer; import io.airbyte.integrations.destination.jdbc.AbstractJdbcDestination; import io.airbyte.integrations.destination.record_buffer.FileBuffer; @@ -50,6 +51,10 @@ public class RedshiftStagingS3Destination extends AbstractJdbcDestination implem private static final Logger LOGGER = LoggerFactory.getLogger(RedshiftStagingS3Destination.class); + public static Destination sshWrappedDestination() { + return new SshWrappedDestination(new RedshiftStagingS3Destination(), JdbcUtils.HOST_LIST_KEY, JdbcUtils.PORT_LIST_KEY); + } + public RedshiftStagingS3Destination() { super(RedshiftInsertDestination.DRIVER_CLASS, new RedshiftSQLNameTransformer(), new RedshiftSqlOperations()); } diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshKeyRedshiftInsertDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshKeyRedshiftInsertDestinationAcceptanceTest.java new file mode 100644 index 000000000000..dba690de9214 --- /dev/null +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshKeyRedshiftInsertDestinationAcceptanceTest.java @@ -0,0 +1,29 @@ +package io.airbyte.integrations.destination.redshift; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.commons.io.IOs; +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.base.ssh.SshTunnel.TunnelMethod; +import java.io.IOException; +import java.nio.file.Path; + +/* + * SshKeyRedshiftInsertDestinationAcceptanceTest runs basic Redshift Destination Tests using the SQL Insert + * mechanism for upload of data and "key" authentication for the SSH bastion configuration. + * + * Check Google Secrets for a new secret config called "SECRET_DESTINATION-REDSHIFT-BASTION__KEY". Place the contents + * in your secrets directory under 'redshift-bastion-key-pair.pem' + */ +public class SshKeyRedshiftInsertDestinationAcceptanceTest extends SshRedshiftDestinationBaseAcceptanceTest { + + @Override + public TunnelMethod getTunnelMethod() { + return TunnelMethod.SSH_KEY_AUTH; + } + + public JsonNode getStaticConfig() throws IOException { + final Path configPath = Path.of("secrets/config.json"); + final String configAsString = IOs.readFile(configPath); + return Jsons.deserialize(configAsString); + } +} diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshPasswordRedshiftStagingDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshPasswordRedshiftStagingDestinationAcceptanceTest.java new file mode 100644 index 000000000000..e9fedcee6e72 --- /dev/null +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshPasswordRedshiftStagingDestinationAcceptanceTest.java @@ -0,0 +1,27 @@ +package io.airbyte.integrations.destination.redshift; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.commons.io.IOs; +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.base.ssh.SshTunnel.TunnelMethod; +import java.io.IOException; +import java.nio.file.Path; + +/* + * SshPasswordRedshiftStagingDestinationAcceptanceTest runs basic Redshift Destination Tests using the S3 Staging + * mechanism for upload of data and "password" authentication for the SSH bastion configuration. + */ +public class SshPasswordRedshiftStagingDestinationAcceptanceTest extends SshRedshiftDestinationBaseAcceptanceTest { + + @Override + public TunnelMethod getTunnelMethod() { + return TunnelMethod.SSH_PASSWORD_AUTH; + } + + @Override + public JsonNode getStaticConfig() throws IOException { + final Path configPath = Path.of("secrets/config_staging.json"); + final String configAsString = IOs.readFile(configPath); + return Jsons.deserialize(configAsString); + } +} diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshRedshiftDestinationBaseAcceptanceTest.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshRedshiftDestinationBaseAcceptanceTest.java new file mode 100644 index 000000000000..358fb8d99338 --- /dev/null +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshRedshiftDestinationBaseAcceptanceTest.java @@ -0,0 +1,204 @@ +package io.airbyte.integrations.destination.redshift; + +import static io.airbyte.integrations.base.ssh.SshTunnel.TunnelMethod.SSH_KEY_AUTH; +import static io.airbyte.integrations.base.ssh.SshTunnel.TunnelMethod.SSH_PASSWORD_AUTH; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableMap.Builder; +import io.airbyte.commons.jackson.MoreMappers; +import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator; +import java.util.Map; +import io.airbyte.commons.io.IOs; +import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.string.Strings; +import io.airbyte.db.Database; +import io.airbyte.db.factory.DSLContextFactory; +import io.airbyte.db.factory.DatabaseDriver; +import io.airbyte.db.jdbc.JdbcUtils; +import io.airbyte.integrations.base.JavaBaseConstants; +import io.airbyte.integrations.base.ssh.SshTunnel; +import io.airbyte.integrations.destination.redshift.operations.RedshiftSqlOperations; +import io.airbyte.integrations.standardtest.destination.JdbcDestinationAcceptanceTest; +import java.io.IOException; +import java.nio.file.Path; +import java.util.List; +import java.util.stream.Collectors; + +public abstract class SshRedshiftDestinationBaseAcceptanceTest extends JdbcDestinationAcceptanceTest { + + protected String schemaName; + // config from which to create / delete schemas. + protected JsonNode baseConfig; + // config which refers to the schema that the test is being run in. + protected JsonNode config; + + private final RedshiftSQLNameTransformer namingResolver = new RedshiftSQLNameTransformer(); + private final String USER_WITHOUT_CREDS = Strings.addRandomSuffix("test_user", "_", 5); + + public abstract SshTunnel.TunnelMethod getTunnelMethod(); + + @Override + protected String getImageName() { + return "airbyte/destination-redshift:dev"; + } + + @Override + protected JsonNode getConfig() throws Exception { + final Map configAsMap = deserializeToObjectMap(config); + final Builder configMapBuilder = new Builder<>().putAll(configAsMap); + return getTunnelConfig(getTunnelMethod(), configMapBuilder); + } + + protected JsonNode getTunnelConfig(final SshTunnel.TunnelMethod tunnelMethod, final ImmutableMap.Builder builderWithSchema) { + final Path keyPath = Path.of("secrets/redshift-bastion-key-pair.pem"); + final String keyAsString = IOs.readFile(keyPath); + + final JsonNode sshBastionHost = config.get("ssh_bastion_host"); + final JsonNode sshBastionPort = config.get("ssh_bastion_port"); + final JsonNode sshBastionUser = config.get("ssh_bastion_user"); + final JsonNode sshBastionPassword = config.get("ssh_bastion_password"); + + final String tunnelUserPassword = tunnelMethod.equals(SSH_PASSWORD_AUTH) ? sshBastionPassword.asText() : ""; + final String sshKey = tunnelMethod.equals(SSH_KEY_AUTH) ? keyAsString : ""; + + return Jsons.jsonNode(builderWithSchema + .put("tunnel_method", Jsons.jsonNode(ImmutableMap.builder() + .put("tunnel_host", sshBastionHost) + .put("tunnel_method", tunnelMethod.toString()) + .put("tunnel_port", sshBastionPort.intValue()) + .put("tunnel_user", sshBastionUser) + .put("tunnel_user_password", tunnelUserPassword) + .put("ssh_key", sshKey) + .build())) + .build()); + } + + public static Map deserializeToObjectMap(final JsonNode json) { + final ObjectMapper objectMapper = MoreMappers.initMapper(); + return objectMapper.convertValue(json, new TypeReference<>() { + }); + } + + public abstract JsonNode getStaticConfig() throws IOException; + + @Override + protected JsonNode getFailCheckConfig() { + final JsonNode invalidConfig = Jsons.clone(config); + ((ObjectNode) invalidConfig).put("password", "wrong password"); + return invalidConfig; + } + + @Override + protected boolean implementsNamespaces() { + return true; + } + + @Override + protected List retrieveNormalizedRecords(final TestDestinationEnv env, final String streamName, final String namespace) + throws Exception { + final String tableName = namingResolver.getIdentifier(streamName); + return retrieveRecordsFromTable(tableName, namespace); + } + + @Override + protected List retrieveRecords(final TestDestinationEnv testEnv, final String streamName, final String namespace, + final JsonNode streamSchema) throws Exception { + return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName), namespace) + .stream() + .map(j -> j.get(JavaBaseConstants.COLUMN_NAME_DATA)) + .collect(Collectors.toList()); + } + + private List retrieveRecordsFromTable(final String tableName, final String schemaName) throws Exception { + return SshTunnel.sshWrap( + getConfig(), + JdbcUtils.HOST_LIST_KEY, + JdbcUtils.PORT_LIST_KEY, + config -> { + return getDatabaseFromConfig(config).query(ctx -> ctx + .fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT)) + .stream() + .map(this::getJsonFromRecord) + .collect(Collectors.toList())); + }); + + } + + @Override + protected TestDataComparator getTestDataComparator() { + return new RedshiftTestDataComparator(); + } + + private static Database getDatabaseFromConfig(final JsonNode config) { + return new Database( + DSLContextFactory.create( + config.get(JdbcUtils.USERNAME_KEY).asText(), + config.get(JdbcUtils.PASSWORD_KEY).asText(), + DatabaseDriver.REDSHIFT.getDriverClassName(), + String.format(DatabaseDriver.REDSHIFT.getUrlFormatString(), + config.get(JdbcUtils.HOST_KEY).asText(), + config.get(JdbcUtils.PORT_KEY).asInt(), + config.get(JdbcUtils.DATABASE_KEY).asText()), + null, + RedshiftInsertDestination.SSL_JDBC_PARAMETERS)); + } + + @Override + protected int getMaxRecordValueLimit() { + return RedshiftSqlOperations.REDSHIFT_VARCHAR_MAX_BYTE_SIZE; + } + + @Override + protected void setup(final TestDestinationEnv testEnv) throws Exception { + baseConfig = getStaticConfig(); + final JsonNode configForSchema = Jsons.clone(baseConfig); + schemaName = Strings.addRandomSuffix("integration_test", "_", 5); + ((ObjectNode) configForSchema).put("schema", schemaName); + config = configForSchema; + + // create the schema + SshTunnel.sshWrap( + getConfig(), + JdbcUtils.HOST_LIST_KEY, + JdbcUtils.PORT_LIST_KEY, + config -> { + getDatabaseFromConfig(config).query(ctx -> ctx.fetch(String.format("CREATE SCHEMA %s;", schemaName))); + }); + + // create the user + SshTunnel.sshWrap( + getConfig(), + JdbcUtils.HOST_LIST_KEY, + JdbcUtils.PORT_LIST_KEY, + config -> { + getDatabaseFromConfig(config).query(ctx -> ctx.fetch(String.format("CREATE USER %s WITH PASSWORD '%s';", + USER_WITHOUT_CREDS, baseConfig.get("password").asText()))); + }); + } + + @Override + protected void tearDown(final TestDestinationEnv testEnv) throws Exception { + // blow away the test schema at the end. + SshTunnel.sshWrap( + getConfig(), + JdbcUtils.HOST_LIST_KEY, + JdbcUtils.PORT_LIST_KEY, + config -> { + getDatabaseFromConfig(config).query(ctx -> ctx.fetch(String.format("DROP SCHEMA IF EXISTS %s CASCADE;", schemaName))); + }); + + // blow away the user at the end. + SshTunnel.sshWrap( + getConfig(), + JdbcUtils.HOST_LIST_KEY, + JdbcUtils.PORT_LIST_KEY, + config -> { + getDatabaseFromConfig(config).query(ctx -> ctx.fetch(String.format("DROP USER IF EXISTS %s;", USER_WITHOUT_CREDS))); + }); + } + +} From bd08340513b8f8fa67f4f95f41008dea16f148a1 Mon Sep 17 00:00:00 2001 From: Jeff Cowan Date: Mon, 27 Feb 2023 15:25:07 -0800 Subject: [PATCH 2/9] Redshift Destination: Add SSH Tunnelling Config Option Issue: https://github.com/airbytehq/airbyte/issues/12131 User would like to be able to access their Redshift Destination database via an SSH Tunnelling config (SSH Bastion Host), just as in Postgres, etc. destinations. Acceptance tests pass and exercise both Insert and S3 type loading options as well as password and key based authentication with the SSH bastion host. Airbyters: Credentials/configs for acceptance testing against AWS Redshift are updated in Google Cloud Secret Manager. Be sure to get both the config.json and config_staging.json as well as the new PEM file --- .../redshift/RedshiftDestination.java | 10 +- .../redshift/RedshiftInsertDestination.java | 6 + .../RedshiftStagingS3Destination.java | 5 + ...dshiftInsertDestinationAcceptanceTest.java | 29 +++ ...shiftStagingDestinationAcceptanceTest.java | 27 +++ ...RedshiftDestinationBaseAcceptanceTest.java | 204 ++++++++++++++++++ 6 files changed, 275 insertions(+), 6 deletions(-) create mode 100644 airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshKeyRedshiftInsertDestinationAcceptanceTest.java create mode 100644 airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshPasswordRedshiftStagingDestinationAcceptanceTest.java create mode 100644 airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshRedshiftDestinationBaseAcceptanceTest.java diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftDestination.java b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftDestination.java index b00d965b14bc..041221d19e16 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftDestination.java +++ b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftDestination.java @@ -27,17 +27,16 @@ public class RedshiftDestination extends SwitchingDestination { private static final Logger LOGGER = LoggerFactory.getLogger(RedshiftDestination.class); - private static final String METHOD = "method"; - - private static final Map destinationMap = Map.of( - DestinationType.STANDARD, new RedshiftInsertDestination(), - DestinationType.COPY_S3, new RedshiftStagingS3Destination()); enum DestinationType { STANDARD, COPY_S3 } + private static final Map destinationMap = Map.of( + DestinationType.STANDARD, RedshiftInsertDestination.sshWrappedDestination(), + DestinationType.COPY_S3, RedshiftStagingS3Destination.sshWrappedDestination()); + public RedshiftDestination() { super(DestinationType.class, RedshiftDestination::getTypeFromConfig, destinationMap); } @@ -47,7 +46,6 @@ private static DestinationType getTypeFromConfig(final JsonNode config) { } public static DestinationType determineUploadMode(final JsonNode config) { - final JsonNode jsonNode = findS3Options(config); if (anyOfS3FieldsAreNullOrEmpty(jsonNode)) { diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftInsertDestination.java b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftInsertDestination.java index 945bc5bb1663..08b9e21f3e27 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftInsertDestination.java +++ b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftInsertDestination.java @@ -13,6 +13,8 @@ import io.airbyte.db.jdbc.DefaultJdbcDatabase; import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.db.jdbc.JdbcUtils; +import io.airbyte.integrations.base.Destination; +import io.airbyte.integrations.base.ssh.SshWrappedDestination; import io.airbyte.integrations.destination.jdbc.AbstractJdbcDestination; import io.airbyte.integrations.destination.redshift.operations.RedshiftSqlOperations; import java.util.Map; @@ -26,6 +28,10 @@ public class RedshiftInsertDestination extends AbstractJdbcDestination { JdbcUtils.SSL_KEY, "true", "sslfactory", "com.amazon.redshift.ssl.NonValidatingFactory"); + public static Destination sshWrappedDestination() { + return new SshWrappedDestination(new RedshiftInsertDestination(), JdbcUtils.HOST_LIST_KEY, JdbcUtils.PORT_LIST_KEY); + } + public RedshiftInsertDestination() { super(DRIVER_CLASS, new RedshiftSQLNameTransformer(), new RedshiftSqlOperations()); } diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3Destination.java b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3Destination.java index 3e6f8dfe7d38..40d93ad028e0 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3Destination.java +++ b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3Destination.java @@ -22,6 +22,7 @@ import io.airbyte.integrations.base.AirbyteMessageConsumer; import io.airbyte.integrations.base.AirbyteTraceMessageUtility; import io.airbyte.integrations.base.Destination; +import io.airbyte.integrations.base.ssh.SshWrappedDestination; import io.airbyte.integrations.destination.NamingConventionTransformer; import io.airbyte.integrations.destination.jdbc.AbstractJdbcDestination; import io.airbyte.integrations.destination.record_buffer.FileBuffer; @@ -50,6 +51,10 @@ public class RedshiftStagingS3Destination extends AbstractJdbcDestination implem private static final Logger LOGGER = LoggerFactory.getLogger(RedshiftStagingS3Destination.class); + public static Destination sshWrappedDestination() { + return new SshWrappedDestination(new RedshiftStagingS3Destination(), JdbcUtils.HOST_LIST_KEY, JdbcUtils.PORT_LIST_KEY); + } + public RedshiftStagingS3Destination() { super(RedshiftInsertDestination.DRIVER_CLASS, new RedshiftSQLNameTransformer(), new RedshiftSqlOperations()); } diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshKeyRedshiftInsertDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshKeyRedshiftInsertDestinationAcceptanceTest.java new file mode 100644 index 000000000000..dba690de9214 --- /dev/null +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshKeyRedshiftInsertDestinationAcceptanceTest.java @@ -0,0 +1,29 @@ +package io.airbyte.integrations.destination.redshift; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.commons.io.IOs; +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.base.ssh.SshTunnel.TunnelMethod; +import java.io.IOException; +import java.nio.file.Path; + +/* + * SshKeyRedshiftInsertDestinationAcceptanceTest runs basic Redshift Destination Tests using the SQL Insert + * mechanism for upload of data and "key" authentication for the SSH bastion configuration. + * + * Check Google Secrets for a new secret config called "SECRET_DESTINATION-REDSHIFT-BASTION__KEY". Place the contents + * in your secrets directory under 'redshift-bastion-key-pair.pem' + */ +public class SshKeyRedshiftInsertDestinationAcceptanceTest extends SshRedshiftDestinationBaseAcceptanceTest { + + @Override + public TunnelMethod getTunnelMethod() { + return TunnelMethod.SSH_KEY_AUTH; + } + + public JsonNode getStaticConfig() throws IOException { + final Path configPath = Path.of("secrets/config.json"); + final String configAsString = IOs.readFile(configPath); + return Jsons.deserialize(configAsString); + } +} diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshPasswordRedshiftStagingDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshPasswordRedshiftStagingDestinationAcceptanceTest.java new file mode 100644 index 000000000000..e9fedcee6e72 --- /dev/null +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshPasswordRedshiftStagingDestinationAcceptanceTest.java @@ -0,0 +1,27 @@ +package io.airbyte.integrations.destination.redshift; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.commons.io.IOs; +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.base.ssh.SshTunnel.TunnelMethod; +import java.io.IOException; +import java.nio.file.Path; + +/* + * SshPasswordRedshiftStagingDestinationAcceptanceTest runs basic Redshift Destination Tests using the S3 Staging + * mechanism for upload of data and "password" authentication for the SSH bastion configuration. + */ +public class SshPasswordRedshiftStagingDestinationAcceptanceTest extends SshRedshiftDestinationBaseAcceptanceTest { + + @Override + public TunnelMethod getTunnelMethod() { + return TunnelMethod.SSH_PASSWORD_AUTH; + } + + @Override + public JsonNode getStaticConfig() throws IOException { + final Path configPath = Path.of("secrets/config_staging.json"); + final String configAsString = IOs.readFile(configPath); + return Jsons.deserialize(configAsString); + } +} diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshRedshiftDestinationBaseAcceptanceTest.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshRedshiftDestinationBaseAcceptanceTest.java new file mode 100644 index 000000000000..358fb8d99338 --- /dev/null +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshRedshiftDestinationBaseAcceptanceTest.java @@ -0,0 +1,204 @@ +package io.airbyte.integrations.destination.redshift; + +import static io.airbyte.integrations.base.ssh.SshTunnel.TunnelMethod.SSH_KEY_AUTH; +import static io.airbyte.integrations.base.ssh.SshTunnel.TunnelMethod.SSH_PASSWORD_AUTH; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableMap.Builder; +import io.airbyte.commons.jackson.MoreMappers; +import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator; +import java.util.Map; +import io.airbyte.commons.io.IOs; +import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.string.Strings; +import io.airbyte.db.Database; +import io.airbyte.db.factory.DSLContextFactory; +import io.airbyte.db.factory.DatabaseDriver; +import io.airbyte.db.jdbc.JdbcUtils; +import io.airbyte.integrations.base.JavaBaseConstants; +import io.airbyte.integrations.base.ssh.SshTunnel; +import io.airbyte.integrations.destination.redshift.operations.RedshiftSqlOperations; +import io.airbyte.integrations.standardtest.destination.JdbcDestinationAcceptanceTest; +import java.io.IOException; +import java.nio.file.Path; +import java.util.List; +import java.util.stream.Collectors; + +public abstract class SshRedshiftDestinationBaseAcceptanceTest extends JdbcDestinationAcceptanceTest { + + protected String schemaName; + // config from which to create / delete schemas. + protected JsonNode baseConfig; + // config which refers to the schema that the test is being run in. + protected JsonNode config; + + private final RedshiftSQLNameTransformer namingResolver = new RedshiftSQLNameTransformer(); + private final String USER_WITHOUT_CREDS = Strings.addRandomSuffix("test_user", "_", 5); + + public abstract SshTunnel.TunnelMethod getTunnelMethod(); + + @Override + protected String getImageName() { + return "airbyte/destination-redshift:dev"; + } + + @Override + protected JsonNode getConfig() throws Exception { + final Map configAsMap = deserializeToObjectMap(config); + final Builder configMapBuilder = new Builder<>().putAll(configAsMap); + return getTunnelConfig(getTunnelMethod(), configMapBuilder); + } + + protected JsonNode getTunnelConfig(final SshTunnel.TunnelMethod tunnelMethod, final ImmutableMap.Builder builderWithSchema) { + final Path keyPath = Path.of("secrets/redshift-bastion-key-pair.pem"); + final String keyAsString = IOs.readFile(keyPath); + + final JsonNode sshBastionHost = config.get("ssh_bastion_host"); + final JsonNode sshBastionPort = config.get("ssh_bastion_port"); + final JsonNode sshBastionUser = config.get("ssh_bastion_user"); + final JsonNode sshBastionPassword = config.get("ssh_bastion_password"); + + final String tunnelUserPassword = tunnelMethod.equals(SSH_PASSWORD_AUTH) ? sshBastionPassword.asText() : ""; + final String sshKey = tunnelMethod.equals(SSH_KEY_AUTH) ? keyAsString : ""; + + return Jsons.jsonNode(builderWithSchema + .put("tunnel_method", Jsons.jsonNode(ImmutableMap.builder() + .put("tunnel_host", sshBastionHost) + .put("tunnel_method", tunnelMethod.toString()) + .put("tunnel_port", sshBastionPort.intValue()) + .put("tunnel_user", sshBastionUser) + .put("tunnel_user_password", tunnelUserPassword) + .put("ssh_key", sshKey) + .build())) + .build()); + } + + public static Map deserializeToObjectMap(final JsonNode json) { + final ObjectMapper objectMapper = MoreMappers.initMapper(); + return objectMapper.convertValue(json, new TypeReference<>() { + }); + } + + public abstract JsonNode getStaticConfig() throws IOException; + + @Override + protected JsonNode getFailCheckConfig() { + final JsonNode invalidConfig = Jsons.clone(config); + ((ObjectNode) invalidConfig).put("password", "wrong password"); + return invalidConfig; + } + + @Override + protected boolean implementsNamespaces() { + return true; + } + + @Override + protected List retrieveNormalizedRecords(final TestDestinationEnv env, final String streamName, final String namespace) + throws Exception { + final String tableName = namingResolver.getIdentifier(streamName); + return retrieveRecordsFromTable(tableName, namespace); + } + + @Override + protected List retrieveRecords(final TestDestinationEnv testEnv, final String streamName, final String namespace, + final JsonNode streamSchema) throws Exception { + return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName), namespace) + .stream() + .map(j -> j.get(JavaBaseConstants.COLUMN_NAME_DATA)) + .collect(Collectors.toList()); + } + + private List retrieveRecordsFromTable(final String tableName, final String schemaName) throws Exception { + return SshTunnel.sshWrap( + getConfig(), + JdbcUtils.HOST_LIST_KEY, + JdbcUtils.PORT_LIST_KEY, + config -> { + return getDatabaseFromConfig(config).query(ctx -> ctx + .fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT)) + .stream() + .map(this::getJsonFromRecord) + .collect(Collectors.toList())); + }); + + } + + @Override + protected TestDataComparator getTestDataComparator() { + return new RedshiftTestDataComparator(); + } + + private static Database getDatabaseFromConfig(final JsonNode config) { + return new Database( + DSLContextFactory.create( + config.get(JdbcUtils.USERNAME_KEY).asText(), + config.get(JdbcUtils.PASSWORD_KEY).asText(), + DatabaseDriver.REDSHIFT.getDriverClassName(), + String.format(DatabaseDriver.REDSHIFT.getUrlFormatString(), + config.get(JdbcUtils.HOST_KEY).asText(), + config.get(JdbcUtils.PORT_KEY).asInt(), + config.get(JdbcUtils.DATABASE_KEY).asText()), + null, + RedshiftInsertDestination.SSL_JDBC_PARAMETERS)); + } + + @Override + protected int getMaxRecordValueLimit() { + return RedshiftSqlOperations.REDSHIFT_VARCHAR_MAX_BYTE_SIZE; + } + + @Override + protected void setup(final TestDestinationEnv testEnv) throws Exception { + baseConfig = getStaticConfig(); + final JsonNode configForSchema = Jsons.clone(baseConfig); + schemaName = Strings.addRandomSuffix("integration_test", "_", 5); + ((ObjectNode) configForSchema).put("schema", schemaName); + config = configForSchema; + + // create the schema + SshTunnel.sshWrap( + getConfig(), + JdbcUtils.HOST_LIST_KEY, + JdbcUtils.PORT_LIST_KEY, + config -> { + getDatabaseFromConfig(config).query(ctx -> ctx.fetch(String.format("CREATE SCHEMA %s;", schemaName))); + }); + + // create the user + SshTunnel.sshWrap( + getConfig(), + JdbcUtils.HOST_LIST_KEY, + JdbcUtils.PORT_LIST_KEY, + config -> { + getDatabaseFromConfig(config).query(ctx -> ctx.fetch(String.format("CREATE USER %s WITH PASSWORD '%s';", + USER_WITHOUT_CREDS, baseConfig.get("password").asText()))); + }); + } + + @Override + protected void tearDown(final TestDestinationEnv testEnv) throws Exception { + // blow away the test schema at the end. + SshTunnel.sshWrap( + getConfig(), + JdbcUtils.HOST_LIST_KEY, + JdbcUtils.PORT_LIST_KEY, + config -> { + getDatabaseFromConfig(config).query(ctx -> ctx.fetch(String.format("DROP SCHEMA IF EXISTS %s CASCADE;", schemaName))); + }); + + // blow away the user at the end. + SshTunnel.sshWrap( + getConfig(), + JdbcUtils.HOST_LIST_KEY, + JdbcUtils.PORT_LIST_KEY, + config -> { + getDatabaseFromConfig(config).query(ctx -> ctx.fetch(String.format("DROP USER IF EXISTS %s;", USER_WITHOUT_CREDS))); + }); + } + +} From e7673ac0e39c2c56370e2f8cfc1091c8fe36c434 Mon Sep 17 00:00:00 2001 From: "Jeff Cowan (Airbyte)" <4992320+jcowanpdx@users.noreply.github.com> Date: Mon, 27 Feb 2023 21:01:17 -0800 Subject: [PATCH 3/9] README.md updates --- .../connectors/destination-redshift/README.md | 35 ++++++++++--------- 1 file changed, 19 insertions(+), 16 deletions(-) diff --git a/airbyte-integrations/connectors/destination-redshift/README.md b/airbyte-integrations/connectors/destination-redshift/README.md index 0c7c6b73cc47..1f0ef7a6d187 100644 --- a/airbyte-integrations/connectors/destination-redshift/README.md +++ b/airbyte-integrations/connectors/destination-redshift/README.md @@ -2,23 +2,26 @@ This is the repository for the Redshift Destination Connector. +This connector can run in one of two mode: + +- Direct Insert: using SQL to directly place data into tables. +- S3 Staging: Data files are uploaded to the customer's S3 and a load is done into the database from these files directly. This is a directly + supported feature of Redshift. Consult Redshift documentation for more information and permissions. + +This connector has a capability to query the database via an SSH Tunnel (bastion host). This can be useful for environments where Redshift has not +been exposed to the internet. + ## Testing -The `AirbyteCopier.java` class contains an ignored local test class. This is ignored as -the necessary components cannot be mocked locally. The class requires credentials in order -to run all its tests. - -Users have the option of either filling in the variables defined at the top of the class, -or filling in a `config.properties` file in `src/test/resources` with the following property keys: -```aidl -s3.keyId= -s3.accessKey= - -redshift.connString= -redshift.user= -redshift.pass= -``` + +Unit tests are run as usual. + +Integration/Acceptance tests are run via the command line with secrets managed out of Google Cloud Secrets Manager. +Consult the integration test area for Redshift. + ## Actual secrets -The actual secrets for integration tests could be found in Google Secrets Manager. It could be found by next labels: + +The actual secrets for integration tests can be found in Google Cloud Secrets Manager. Search on redshift for the labels: + - SECRET_DESTINATION-REDSHIFT__CREDS - used for Standard tests. (__config.json__) - SECRET_DESTINATION-REDSHIFT_STAGING__CREDS - used for S3 Staging tests. (__config_staging.json__) - +- SECRET_DESTINATION-REDSHIFT-BASTION__KEY - secret key for the EC2 bastion host (__redshift-bastion-key-pair.pem__) From 6c82cbe8fc3ac13b908148bd15d930f8429af806 Mon Sep 17 00:00:00 2001 From: "Jeff Cowan (Airbyte)" <4992320+jcowanpdx@users.noreply.github.com> Date: Tue, 28 Feb 2023 11:37:49 -0800 Subject: [PATCH 4/9] Move the pem info into the secrets config files. Update the version id. Touch up the docs. --- .../destination-redshift/Dockerfile | 2 +- .../connectors/destination-redshift/README.md | 1 - .../redshift/RedshiftDestination.java | 25 +++++++++++++------ ...dshiftInsertDestinationAcceptanceTest.java | 3 --- ...RedshiftDestinationBaseAcceptanceTest.java | 6 ++--- docs/integrations/destinations/redshift.md | 6 +++++ 6 files changed, 27 insertions(+), 16 deletions(-) diff --git a/airbyte-integrations/connectors/destination-redshift/Dockerfile b/airbyte-integrations/connectors/destination-redshift/Dockerfile index 6a56861b7fa5..f05f4bf54679 100644 --- a/airbyte-integrations/connectors/destination-redshift/Dockerfile +++ b/airbyte-integrations/connectors/destination-redshift/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION destination-redshift COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.3.56 +LABEL io.airbyte.version=0.3.57 LABEL io.airbyte.name=airbyte/destination-redshift diff --git a/airbyte-integrations/connectors/destination-redshift/README.md b/airbyte-integrations/connectors/destination-redshift/README.md index 1f0ef7a6d187..ecea733473a3 100644 --- a/airbyte-integrations/connectors/destination-redshift/README.md +++ b/airbyte-integrations/connectors/destination-redshift/README.md @@ -24,4 +24,3 @@ The actual secrets for integration tests can be found in Google Cloud Secrets Ma - SECRET_DESTINATION-REDSHIFT__CREDS - used for Standard tests. (__config.json__) - SECRET_DESTINATION-REDSHIFT_STAGING__CREDS - used for S3 Staging tests. (__config_staging.json__) -- SECRET_DESTINATION-REDSHIFT-BASTION__KEY - secret key for the EC2 bastion host (__redshift-bastion-key-pair.pem__) diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftDestination.java b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftDestination.java index 041221d19e16..accc74e06d1e 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftDestination.java +++ b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftDestination.java @@ -8,21 +8,23 @@ import static io.airbyte.integrations.destination.redshift.util.RedshiftUtil.findS3Options; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.resources.MoreResources; import io.airbyte.integrations.base.Destination; import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.integrations.destination.jdbc.copy.SwitchingDestination; +import io.airbyte.protocol.models.v0.ConnectorSpecification; import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * The Redshift Destination offers two replication strategies. The first inserts via a typical SQL - * Insert statement. Although less efficient, this requires less user set up. See - * {@link RedshiftInsertDestination} for more detail. The second inserts via streaming the data to - * an S3 bucket, and Cop-ing the date into Redshift. This is more efficient, and recommended for - * production workloads, but does require users to set up an S3 bucket and pass in additional - * credentials. See {@link RedshiftStagingS3Destination} for more detail. This class inspect the - * given arguments to determine which strategy to use. + * The Redshift Destination offers two replication strategies. The first inserts via a typical SQL Insert statement. Although less efficient, this + * requires less user set up. See {@link RedshiftInsertDestination} for more detail. The second inserts via streaming the data to an S3 bucket, and + * Cop-ing the date into Redshift. This is more efficient, and recommended for production workloads, but does require users to set up an S3 bucket and + * pass in additional credentials. See {@link RedshiftStagingS3Destination} for more detail. This class inspect the given arguments to determine which + * strategy to use. */ public class RedshiftDestination extends SwitchingDestination { @@ -56,6 +58,15 @@ public static DestinationType determineUploadMode(final JsonNode config) { return DestinationType.COPY_S3; } + @Override + public ConnectorSpecification spec() throws Exception { + // inject the standard ssh configuration into the spec. + final ConnectorSpecification originalSpec = super.spec(); + final ObjectNode propNode = (ObjectNode) originalSpec.getConnectionSpecification().get("properties"); + propNode.set("tunnel_method", Jsons.deserialize(MoreResources.readResource("ssh-tunnel-spec.json"))); + return originalSpec; + } + public static void main(final String[] args) throws Exception { final Destination destination = new RedshiftDestination(); LOGGER.info("starting destination: {}", RedshiftDestination.class); diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshKeyRedshiftInsertDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshKeyRedshiftInsertDestinationAcceptanceTest.java index dba690de9214..ed2e18fb935b 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshKeyRedshiftInsertDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshKeyRedshiftInsertDestinationAcceptanceTest.java @@ -10,9 +10,6 @@ /* * SshKeyRedshiftInsertDestinationAcceptanceTest runs basic Redshift Destination Tests using the SQL Insert * mechanism for upload of data and "key" authentication for the SSH bastion configuration. - * - * Check Google Secrets for a new secret config called "SECRET_DESTINATION-REDSHIFT-BASTION__KEY". Place the contents - * in your secrets directory under 'redshift-bastion-key-pair.pem' */ public class SshKeyRedshiftInsertDestinationAcceptanceTest extends SshRedshiftDestinationBaseAcceptanceTest { diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshRedshiftDestinationBaseAcceptanceTest.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshRedshiftDestinationBaseAcceptanceTest.java index 358fb8d99338..27a2a9a39cf3 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshRedshiftDestinationBaseAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshRedshiftDestinationBaseAcceptanceTest.java @@ -54,16 +54,14 @@ protected JsonNode getConfig() throws Exception { } protected JsonNode getTunnelConfig(final SshTunnel.TunnelMethod tunnelMethod, final ImmutableMap.Builder builderWithSchema) { - final Path keyPath = Path.of("secrets/redshift-bastion-key-pair.pem"); - final String keyAsString = IOs.readFile(keyPath); - final JsonNode sshBastionHost = config.get("ssh_bastion_host"); final JsonNode sshBastionPort = config.get("ssh_bastion_port"); final JsonNode sshBastionUser = config.get("ssh_bastion_user"); final JsonNode sshBastionPassword = config.get("ssh_bastion_password"); + final JsonNode sshBastionKey = config.get("ssh_bastion_key"); final String tunnelUserPassword = tunnelMethod.equals(SSH_PASSWORD_AUTH) ? sshBastionPassword.asText() : ""; - final String sshKey = tunnelMethod.equals(SSH_KEY_AUTH) ? keyAsString : ""; + final String sshKey = tunnelMethod.equals(SSH_KEY_AUTH) ? sshBastionKey.asText() : ""; return Jsons.jsonNode(builderWithSchema .put("tunnel_method", Jsons.jsonNode(ImmutableMap.builder() diff --git a/docs/integrations/destinations/redshift.md b/docs/integrations/destinations/redshift.md index aa20acc9d276..9d75327c0b46 100644 --- a/docs/integrations/destinations/redshift.md +++ b/docs/integrations/destinations/redshift.md @@ -56,6 +56,11 @@ Optional parameters: 4. (Optional) [Allow](https://aws.amazon.com/premiumsupport/knowledge-center/cannot-connect-redshift-cluster/) connections from Airbyte to your Redshift cluster \(if they exist in separate VPCs\) 5. (Optional) [Create](https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket-overview.html) a staging S3 bucket \(for the COPY strategy\). +### Optional Use of SSH Bastion Host +This connector supports the use of a Bastion host as a gateway to a private Redshift cluster via SSH Tunneling. +Setup of the host is beyond the scope of this document but several tutorials are available online to fascilitate this task. +Enter the bastion host, port and credentials in the destination configuration. + ## Step 2: Set up the destination connector in Airbyte **For Airbyte Cloud:** @@ -141,6 +146,7 @@ Each stream will be output into its own raw table in Redshift. Each table will c | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 0.3.57 | 2023-02-28 | [\#23523](https://github.com/airbytehq/airbyte/pull/23523) | Add SSH Bastion Host configuration options | | 0.3.56 | 2023-01-26 | [\#21890](https://github.com/airbytehq/airbyte/pull/21890) | Fixed configurable parameter for number of file buffers | | 0.3.55 | 2023-01-26 | [\#20631](https://github.com/airbytehq/airbyte/pull/20631) | Added support for destination checkpointing with staging | | 0.3.54 | 2023-01-18 | [\#21087](https://github.com/airbytehq/airbyte/pull/21087) | Wrap Authentication Errors as Config Exceptions | From 360e4736fd6451dc3fd8f8a7292488f3b272dd48 Mon Sep 17 00:00:00 2001 From: "Jeff Cowan (Airbyte)" <4992320+jcowanpdx@users.noreply.github.com> Date: Tue, 28 Feb 2023 14:18:19 -0800 Subject: [PATCH 5/9] bump to 0.4.0 --- airbyte-integrations/connectors/destination-redshift/Dockerfile | 2 +- docs/integrations/destinations/redshift.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/destination-redshift/Dockerfile b/airbyte-integrations/connectors/destination-redshift/Dockerfile index f05f4bf54679..7a8db1bf4588 100644 --- a/airbyte-integrations/connectors/destination-redshift/Dockerfile +++ b/airbyte-integrations/connectors/destination-redshift/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION destination-redshift COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.3.57 +LABEL io.airbyte.version=0.4.0 LABEL io.airbyte.name=airbyte/destination-redshift diff --git a/docs/integrations/destinations/redshift.md b/docs/integrations/destinations/redshift.md index 9d75327c0b46..c5833aa11597 100644 --- a/docs/integrations/destinations/redshift.md +++ b/docs/integrations/destinations/redshift.md @@ -146,7 +146,7 @@ Each stream will be output into its own raw table in Redshift. Each table will c | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| 0.3.57 | 2023-02-28 | [\#23523](https://github.com/airbytehq/airbyte/pull/23523) | Add SSH Bastion Host configuration options | +| 0.4.0 | 2023-02-28 | [\#23523](https://github.com/airbytehq/airbyte/pull/23523) | Add SSH Bastion Host configuration options | | 0.3.56 | 2023-01-26 | [\#21890](https://github.com/airbytehq/airbyte/pull/21890) | Fixed configurable parameter for number of file buffers | | 0.3.55 | 2023-01-26 | [\#20631](https://github.com/airbytehq/airbyte/pull/20631) | Added support for destination checkpointing with staging | | 0.3.54 | 2023-01-18 | [\#21087](https://github.com/airbytehq/airbyte/pull/21087) | Wrap Authentication Errors as Config Exceptions | From 0c933ec306fbe5ef05ca76e60ae7dbd44a8f285b Mon Sep 17 00:00:00 2001 From: "Jeff Cowan (Airbyte)" <4992320+jcowanpdx@users.noreply.github.com> Date: Thu, 2 Mar 2023 11:46:30 -0800 Subject: [PATCH 6/9] adding a note about S3 staging NOT using the SSH Tunnel configuration, if provided. --- docs/integrations/destinations/redshift.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/integrations/destinations/redshift.md b/docs/integrations/destinations/redshift.md index c5833aa11597..2f4e0770ef7d 100644 --- a/docs/integrations/destinations/redshift.md +++ b/docs/integrations/destinations/redshift.md @@ -46,6 +46,7 @@ Optional parameters: * **Purge Staging Data** * Whether to delete the staging files from S3 after completing the sync. Specifically, the connector will create CSV files named `bucketPath/namespace/streamName/syncDate_epochMillis_randomUuid.csv` containing three columns (`ab_id`, `data`, `emitted_at`). Normally these files are deleted after the `COPY` command completes; if you want to keep them for other purposes, set `purge_staging_data` to `false`. +NOTE: S3 staging does not use the SSH Tunnel option, if configured. SSH Tunnel supports the SQL connection only. S3 is secured through public HTTPS access only. ## Step 1: Set up Redshift From c915214797334428e15f0d6183c1c80c8236fff4 Mon Sep 17 00:00:00 2001 From: "Jeff Cowan (Airbyte)" <4992320+jcowanpdx@users.noreply.github.com> Date: Mon, 6 Mar 2023 11:05:45 -0800 Subject: [PATCH 7/9] limit idle connection time for redshift users --- .../redshift/RedshiftStagingS3DestinationAcceptanceTest.java | 2 +- .../redshift/SshRedshiftDestinationBaseAcceptanceTest.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3DestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3DestinationAcceptanceTest.java index a09ca38b2696..d1f41ea1ebac 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3DestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3DestinationAcceptanceTest.java @@ -204,7 +204,7 @@ protected void setup(final TestDestinationEnv testEnv) throws Exception { final String createSchemaQuery = String.format("CREATE SCHEMA %s", schemaName); baseConfig = getStaticConfig(); getDatabase().query(ctx -> ctx.execute(createSchemaQuery)); - final String createUser = String.format("create user %s with password '%s';", + final String createUser = String.format("create user %s with password '%s' SESSION TIMEOUT 60;", USER_WITHOUT_CREDS, baseConfig.get("password").asText()); getDatabase().query(ctx -> ctx.execute(createUser)); final JsonNode configForSchema = Jsons.clone(baseConfig); diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshRedshiftDestinationBaseAcceptanceTest.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshRedshiftDestinationBaseAcceptanceTest.java index 27a2a9a39cf3..81ed12fc1650 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshRedshiftDestinationBaseAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshRedshiftDestinationBaseAcceptanceTest.java @@ -173,7 +173,7 @@ protected void setup(final TestDestinationEnv testEnv) throws Exception { JdbcUtils.HOST_LIST_KEY, JdbcUtils.PORT_LIST_KEY, config -> { - getDatabaseFromConfig(config).query(ctx -> ctx.fetch(String.format("CREATE USER %s WITH PASSWORD '%s';", + getDatabaseFromConfig(config).query(ctx -> ctx.fetch(String.format("CREATE USER %s WITH PASSWORD '%s' SESSION TIMEOUT 60;", USER_WITHOUT_CREDS, baseConfig.get("password").asText()))); }); } From dbaf59eb0b4653fab0285e3542ee0e6dcff3ede5 Mon Sep 17 00:00:00 2001 From: "Jeff Cowan (Airbyte)" <4992320+jcowanpdx@users.noreply.github.com> Date: Mon, 6 Mar 2023 12:42:33 -0800 Subject: [PATCH 8/9] make base class abstract to prevent double running the same tests --- .../redshift/RedshiftStagingS3DestinationAcceptanceTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3DestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3DestinationAcceptanceTest.java index d1f41ea1ebac..a260ffa166bd 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3DestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3DestinationAcceptanceTest.java @@ -36,7 +36,7 @@ * Integration test testing {@link RedshiftStagingS3Destination}. The default Redshift integration * test credentials contain S3 credentials - this automatically causes COPY to be selected. */ -public class RedshiftStagingS3DestinationAcceptanceTest extends JdbcDestinationAcceptanceTest { +public abstract class RedshiftStagingS3DestinationAcceptanceTest extends JdbcDestinationAcceptanceTest { private static final Logger LOGGER = LoggerFactory.getLogger(RedshiftStagingS3DestinationAcceptanceTest.class); From acc8ceee8ffd14f26d9f6aa88286ec73fdfdae4f Mon Sep 17 00:00:00 2001 From: Octavia Squidington III Date: Mon, 6 Mar 2023 21:48:44 +0000 Subject: [PATCH 9/9] auto-bump connector version --- .../seed/destination_definitions.yaml | 2 +- .../resources/seed/destination_specs.yaml | 103 +++++++++++++++++- connectors.md | 2 +- 3 files changed, 104 insertions(+), 3 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index 3a9b5e19f744..6a9e600bf269 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -290,7 +290,7 @@ - name: Redshift destinationDefinitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc dockerRepository: airbyte/destination-redshift - dockerImageTag: 0.3.56 + dockerImageTag: 0.4.0 documentationUrl: https://docs.airbyte.com/integrations/destinations/redshift icon: redshift.svg normalizationConfig: diff --git a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml index 485bac02c911..c112f3cea371 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml @@ -5055,7 +5055,7 @@ supported_destination_sync_modes: - "overwrite" - "append" -- dockerImage: "airbyte/destination-redshift:0.3.56" +- dockerImage: "airbyte/destination-redshift:0.4.0" spec: documentationUrl: "https://docs.airbyte.com/integrations/destinations/redshift" connectionSpecification: @@ -5282,6 +5282,107 @@ \ has deteriorating effects" examples: - "10" + tunnel_method: + type: "object" + title: "SSH Tunnel Method" + description: "Whether to initiate an SSH tunnel before connecting to the\ + \ database, and if so, which kind of authentication to use." + oneOf: + - title: "No Tunnel" + required: + - "tunnel_method" + properties: + tunnel_method: + description: "No ssh tunnel needed to connect to database" + type: "string" + const: "NO_TUNNEL" + order: 0 + - title: "SSH Key Authentication" + required: + - "tunnel_method" + - "tunnel_host" + - "tunnel_port" + - "tunnel_user" + - "ssh_key" + properties: + tunnel_method: + description: "Connect through a jump server tunnel host using username\ + \ and ssh key" + type: "string" + const: "SSH_KEY_AUTH" + order: 0 + tunnel_host: + title: "SSH Tunnel Jump Server Host" + description: "Hostname of the jump server host that allows inbound\ + \ ssh tunnel." + type: "string" + order: 1 + tunnel_port: + title: "SSH Connection Port" + description: "Port on the proxy/jump server that accepts inbound ssh\ + \ connections." + type: "integer" + minimum: 0 + maximum: 65536 + default: 22 + examples: + - "22" + order: 2 + tunnel_user: + title: "SSH Login Username" + description: "OS-level username for logging into the jump server host." + type: "string" + order: 3 + ssh_key: + title: "SSH Private Key" + description: "OS-level user account ssh key credentials in RSA PEM\ + \ format ( created with ssh-keygen -t rsa -m PEM -f myuser_rsa )" + type: "string" + airbyte_secret: true + multiline: true + order: 4 + - title: "Password Authentication" + required: + - "tunnel_method" + - "tunnel_host" + - "tunnel_port" + - "tunnel_user" + - "tunnel_user_password" + properties: + tunnel_method: + description: "Connect through a jump server tunnel host using username\ + \ and password authentication" + type: "string" + const: "SSH_PASSWORD_AUTH" + order: 0 + tunnel_host: + title: "SSH Tunnel Jump Server Host" + description: "Hostname of the jump server host that allows inbound\ + \ ssh tunnel." + type: "string" + order: 1 + tunnel_port: + title: "SSH Connection Port" + description: "Port on the proxy/jump server that accepts inbound ssh\ + \ connections." + type: "integer" + minimum: 0 + maximum: 65536 + default: 22 + examples: + - "22" + order: 2 + tunnel_user: + title: "SSH Login Username" + description: "OS-level username for logging into the jump server host" + type: "string" + order: 3 + tunnel_user_password: + title: "Password" + description: "OS-level password for logging into the jump server host" + type: "string" + airbyte_secret: true + order: 4 supportsIncremental: true supportsNormalization: true supportsDBT: true diff --git a/connectors.md b/connectors.md index cf215cb12c10..e24fb2614dad 100644 --- a/connectors.md +++ b/connectors.md @@ -319,7 +319,7 @@ | **RabbitMQ** | RabbitMQ icon | Destination | airbyte/destination-rabbitmq:0.1.1 | alpha | [link](https://docs.airbyte.com/integrations/destinations/rabbitmq) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/destination-rabbitmq) | `e06ad785-ad6f-4647-b2e8-3027a5c59454` | | **Redis** | Redis icon | Destination | airbyte/destination-redis:0.1.4 | alpha | [link](https://docs.airbyte.com/integrations/destinations/redis) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/destination-redis) | `d4d3fef9-e319-45c2-881a-bd02ce44cc9f` | | **Redpanda** | Redpanda icon | Destination | airbyte/destination-redpanda:0.1.0 | alpha | [link](https://docs.airbyte.com/integrations/destinations/redpanda) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/destination-redpanda) | `825c5ee3-ed9a-4dd1-a2b6-79ed722f7b13` | -| **Redshift** | Redshift icon | Destination | airbyte/destination-redshift:0.3.56 | beta | [link](https://docs.airbyte.com/integrations/destinations/redshift) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/destination-redshift) | `f7a7d195-377f-cf5b-70a5-be6b819019dc` | +| **Redshift** | Redshift icon | Destination | airbyte/destination-redshift:0.4.0 | beta | [link](https://docs.airbyte.com/integrations/destinations/redshift) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/destination-redshift) | `f7a7d195-377f-cf5b-70a5-be6b819019dc` | | **Rockset** | x | Destination | airbyte/destination-rockset:0.1.4 | alpha | [link](https://docs.airbyte.com/integrations/destinations/rockset) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/destination-rockset) | `2c9d93a7-9a17-4789-9de9-f46f0097eb70` | | **S3** | S3 icon | Destination | airbyte/destination-s3:0.3.20 | generally_available | [link](https://docs.airbyte.com/integrations/destinations/s3) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/destination-s3) | `4816b78f-1489-44c1-9060-4b19d5fa9362` | | **S3 Glue** | S3 Glue icon | Destination | airbyte/destination-s3-glue:0.1.2 | alpha | [link](https://docs.airbyte.com/integrations/destinations/s3-glue) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/destination-s3-glue) | `471e5cab-8ed1-49f3-ba11-79c687784737` |