diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftDestination.java b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftDestination.java index b00d965b14bc..041221d19e16 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftDestination.java +++ b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftDestination.java @@ -27,17 +27,16 @@ public class RedshiftDestination extends SwitchingDestination { private static final Logger LOGGER = LoggerFactory.getLogger(RedshiftDestination.class); - private static final String METHOD = "method"; - - private static final Map destinationMap = Map.of( - DestinationType.STANDARD, new RedshiftInsertDestination(), - DestinationType.COPY_S3, new RedshiftStagingS3Destination()); enum DestinationType { STANDARD, COPY_S3 } + private static final Map destinationMap = Map.of( + DestinationType.STANDARD, RedshiftInsertDestination.sshWrappedDestination(), + DestinationType.COPY_S3, RedshiftStagingS3Destination.sshWrappedDestination()); + public RedshiftDestination() { super(DestinationType.class, RedshiftDestination::getTypeFromConfig, destinationMap); } @@ -47,7 +46,6 @@ private static DestinationType getTypeFromConfig(final JsonNode config) { } public static DestinationType determineUploadMode(final JsonNode config) { - final JsonNode jsonNode = findS3Options(config); if (anyOfS3FieldsAreNullOrEmpty(jsonNode)) { diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftInsertDestination.java b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftInsertDestination.java index 945bc5bb1663..08b9e21f3e27 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftInsertDestination.java +++ b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftInsertDestination.java @@ -13,6 +13,8 @@ import io.airbyte.db.jdbc.DefaultJdbcDatabase; import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.db.jdbc.JdbcUtils; +import io.airbyte.integrations.base.Destination; +import io.airbyte.integrations.base.ssh.SshWrappedDestination; import io.airbyte.integrations.destination.jdbc.AbstractJdbcDestination; import io.airbyte.integrations.destination.redshift.operations.RedshiftSqlOperations; import java.util.Map; @@ -26,6 +28,10 @@ public class RedshiftInsertDestination extends AbstractJdbcDestination { JdbcUtils.SSL_KEY, "true", "sslfactory", "com.amazon.redshift.ssl.NonValidatingFactory"); + public static Destination sshWrappedDestination() { + return new SshWrappedDestination(new RedshiftInsertDestination(), JdbcUtils.HOST_LIST_KEY, JdbcUtils.PORT_LIST_KEY); + } + public RedshiftInsertDestination() { super(DRIVER_CLASS, new RedshiftSQLNameTransformer(), new RedshiftSqlOperations()); } diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3Destination.java b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3Destination.java index 3e6f8dfe7d38..40d93ad028e0 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3Destination.java +++ b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3Destination.java @@ -22,6 +22,7 @@ import io.airbyte.integrations.base.AirbyteMessageConsumer; import io.airbyte.integrations.base.AirbyteTraceMessageUtility; import io.airbyte.integrations.base.Destination; +import io.airbyte.integrations.base.ssh.SshWrappedDestination; import io.airbyte.integrations.destination.NamingConventionTransformer; import io.airbyte.integrations.destination.jdbc.AbstractJdbcDestination; import io.airbyte.integrations.destination.record_buffer.FileBuffer; @@ -50,6 +51,10 @@ public class RedshiftStagingS3Destination extends AbstractJdbcDestination implem private static final Logger LOGGER = LoggerFactory.getLogger(RedshiftStagingS3Destination.class); + public static Destination sshWrappedDestination() { + return new SshWrappedDestination(new RedshiftStagingS3Destination(), JdbcUtils.HOST_LIST_KEY, JdbcUtils.PORT_LIST_KEY); + } + public RedshiftStagingS3Destination() { super(RedshiftInsertDestination.DRIVER_CLASS, new RedshiftSQLNameTransformer(), new RedshiftSqlOperations()); } diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshKeyRedshiftInsertDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshKeyRedshiftInsertDestinationAcceptanceTest.java new file mode 100644 index 000000000000..dba690de9214 --- /dev/null +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshKeyRedshiftInsertDestinationAcceptanceTest.java @@ -0,0 +1,29 @@ +package io.airbyte.integrations.destination.redshift; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.commons.io.IOs; +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.base.ssh.SshTunnel.TunnelMethod; +import java.io.IOException; +import java.nio.file.Path; + +/* + * SshKeyRedshiftInsertDestinationAcceptanceTest runs basic Redshift Destination Tests using the SQL Insert + * mechanism for upload of data and "key" authentication for the SSH bastion configuration. + * + * Check Google Secrets for a new secret config called "SECRET_DESTINATION-REDSHIFT-BASTION__KEY". Place the contents + * in your secrets directory under 'redshift-bastion-key-pair.pem' + */ +public class SshKeyRedshiftInsertDestinationAcceptanceTest extends SshRedshiftDestinationBaseAcceptanceTest { + + @Override + public TunnelMethod getTunnelMethod() { + return TunnelMethod.SSH_KEY_AUTH; + } + + public JsonNode getStaticConfig() throws IOException { + final Path configPath = Path.of("secrets/config.json"); + final String configAsString = IOs.readFile(configPath); + return Jsons.deserialize(configAsString); + } +} diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshPasswordRedshiftStagingDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshPasswordRedshiftStagingDestinationAcceptanceTest.java new file mode 100644 index 000000000000..e9fedcee6e72 --- /dev/null +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshPasswordRedshiftStagingDestinationAcceptanceTest.java @@ -0,0 +1,27 @@ +package io.airbyte.integrations.destination.redshift; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.commons.io.IOs; +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.base.ssh.SshTunnel.TunnelMethod; +import java.io.IOException; +import java.nio.file.Path; + +/* + * SshPasswordRedshiftStagingDestinationAcceptanceTest runs basic Redshift Destination Tests using the S3 Staging + * mechanism for upload of data and "password" authentication for the SSH bastion configuration. + */ +public class SshPasswordRedshiftStagingDestinationAcceptanceTest extends SshRedshiftDestinationBaseAcceptanceTest { + + @Override + public TunnelMethod getTunnelMethod() { + return TunnelMethod.SSH_PASSWORD_AUTH; + } + + @Override + public JsonNode getStaticConfig() throws IOException { + final Path configPath = Path.of("secrets/config_staging.json"); + final String configAsString = IOs.readFile(configPath); + return Jsons.deserialize(configAsString); + } +} diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshRedshiftDestinationBaseAcceptanceTest.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshRedshiftDestinationBaseAcceptanceTest.java new file mode 100644 index 000000000000..358fb8d99338 --- /dev/null +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshRedshiftDestinationBaseAcceptanceTest.java @@ -0,0 +1,204 @@ +package io.airbyte.integrations.destination.redshift; + +import static io.airbyte.integrations.base.ssh.SshTunnel.TunnelMethod.SSH_KEY_AUTH; +import static io.airbyte.integrations.base.ssh.SshTunnel.TunnelMethod.SSH_PASSWORD_AUTH; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableMap.Builder; +import io.airbyte.commons.jackson.MoreMappers; +import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator; +import java.util.Map; +import io.airbyte.commons.io.IOs; +import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.string.Strings; +import io.airbyte.db.Database; +import io.airbyte.db.factory.DSLContextFactory; +import io.airbyte.db.factory.DatabaseDriver; +import io.airbyte.db.jdbc.JdbcUtils; +import io.airbyte.integrations.base.JavaBaseConstants; +import io.airbyte.integrations.base.ssh.SshTunnel; +import io.airbyte.integrations.destination.redshift.operations.RedshiftSqlOperations; +import io.airbyte.integrations.standardtest.destination.JdbcDestinationAcceptanceTest; +import java.io.IOException; +import java.nio.file.Path; +import java.util.List; +import java.util.stream.Collectors; + +public abstract class SshRedshiftDestinationBaseAcceptanceTest extends JdbcDestinationAcceptanceTest { + + protected String schemaName; + // config from which to create / delete schemas. + protected JsonNode baseConfig; + // config which refers to the schema that the test is being run in. + protected JsonNode config; + + private final RedshiftSQLNameTransformer namingResolver = new RedshiftSQLNameTransformer(); + private final String USER_WITHOUT_CREDS = Strings.addRandomSuffix("test_user", "_", 5); + + public abstract SshTunnel.TunnelMethod getTunnelMethod(); + + @Override + protected String getImageName() { + return "airbyte/destination-redshift:dev"; + } + + @Override + protected JsonNode getConfig() throws Exception { + final Map configAsMap = deserializeToObjectMap(config); + final Builder configMapBuilder = new Builder<>().putAll(configAsMap); + return getTunnelConfig(getTunnelMethod(), configMapBuilder); + } + + protected JsonNode getTunnelConfig(final SshTunnel.TunnelMethod tunnelMethod, final ImmutableMap.Builder builderWithSchema) { + final Path keyPath = Path.of("secrets/redshift-bastion-key-pair.pem"); + final String keyAsString = IOs.readFile(keyPath); + + final JsonNode sshBastionHost = config.get("ssh_bastion_host"); + final JsonNode sshBastionPort = config.get("ssh_bastion_port"); + final JsonNode sshBastionUser = config.get("ssh_bastion_user"); + final JsonNode sshBastionPassword = config.get("ssh_bastion_password"); + + final String tunnelUserPassword = tunnelMethod.equals(SSH_PASSWORD_AUTH) ? sshBastionPassword.asText() : ""; + final String sshKey = tunnelMethod.equals(SSH_KEY_AUTH) ? keyAsString : ""; + + return Jsons.jsonNode(builderWithSchema + .put("tunnel_method", Jsons.jsonNode(ImmutableMap.builder() + .put("tunnel_host", sshBastionHost) + .put("tunnel_method", tunnelMethod.toString()) + .put("tunnel_port", sshBastionPort.intValue()) + .put("tunnel_user", sshBastionUser) + .put("tunnel_user_password", tunnelUserPassword) + .put("ssh_key", sshKey) + .build())) + .build()); + } + + public static Map deserializeToObjectMap(final JsonNode json) { + final ObjectMapper objectMapper = MoreMappers.initMapper(); + return objectMapper.convertValue(json, new TypeReference<>() { + }); + } + + public abstract JsonNode getStaticConfig() throws IOException; + + @Override + protected JsonNode getFailCheckConfig() { + final JsonNode invalidConfig = Jsons.clone(config); + ((ObjectNode) invalidConfig).put("password", "wrong password"); + return invalidConfig; + } + + @Override + protected boolean implementsNamespaces() { + return true; + } + + @Override + protected List retrieveNormalizedRecords(final TestDestinationEnv env, final String streamName, final String namespace) + throws Exception { + final String tableName = namingResolver.getIdentifier(streamName); + return retrieveRecordsFromTable(tableName, namespace); + } + + @Override + protected List retrieveRecords(final TestDestinationEnv testEnv, final String streamName, final String namespace, + final JsonNode streamSchema) throws Exception { + return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName), namespace) + .stream() + .map(j -> j.get(JavaBaseConstants.COLUMN_NAME_DATA)) + .collect(Collectors.toList()); + } + + private List retrieveRecordsFromTable(final String tableName, final String schemaName) throws Exception { + return SshTunnel.sshWrap( + getConfig(), + JdbcUtils.HOST_LIST_KEY, + JdbcUtils.PORT_LIST_KEY, + config -> { + return getDatabaseFromConfig(config).query(ctx -> ctx + .fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT)) + .stream() + .map(this::getJsonFromRecord) + .collect(Collectors.toList())); + }); + + } + + @Override + protected TestDataComparator getTestDataComparator() { + return new RedshiftTestDataComparator(); + } + + private static Database getDatabaseFromConfig(final JsonNode config) { + return new Database( + DSLContextFactory.create( + config.get(JdbcUtils.USERNAME_KEY).asText(), + config.get(JdbcUtils.PASSWORD_KEY).asText(), + DatabaseDriver.REDSHIFT.getDriverClassName(), + String.format(DatabaseDriver.REDSHIFT.getUrlFormatString(), + config.get(JdbcUtils.HOST_KEY).asText(), + config.get(JdbcUtils.PORT_KEY).asInt(), + config.get(JdbcUtils.DATABASE_KEY).asText()), + null, + RedshiftInsertDestination.SSL_JDBC_PARAMETERS)); + } + + @Override + protected int getMaxRecordValueLimit() { + return RedshiftSqlOperations.REDSHIFT_VARCHAR_MAX_BYTE_SIZE; + } + + @Override + protected void setup(final TestDestinationEnv testEnv) throws Exception { + baseConfig = getStaticConfig(); + final JsonNode configForSchema = Jsons.clone(baseConfig); + schemaName = Strings.addRandomSuffix("integration_test", "_", 5); + ((ObjectNode) configForSchema).put("schema", schemaName); + config = configForSchema; + + // create the schema + SshTunnel.sshWrap( + getConfig(), + JdbcUtils.HOST_LIST_KEY, + JdbcUtils.PORT_LIST_KEY, + config -> { + getDatabaseFromConfig(config).query(ctx -> ctx.fetch(String.format("CREATE SCHEMA %s;", schemaName))); + }); + + // create the user + SshTunnel.sshWrap( + getConfig(), + JdbcUtils.HOST_LIST_KEY, + JdbcUtils.PORT_LIST_KEY, + config -> { + getDatabaseFromConfig(config).query(ctx -> ctx.fetch(String.format("CREATE USER %s WITH PASSWORD '%s';", + USER_WITHOUT_CREDS, baseConfig.get("password").asText()))); + }); + } + + @Override + protected void tearDown(final TestDestinationEnv testEnv) throws Exception { + // blow away the test schema at the end. + SshTunnel.sshWrap( + getConfig(), + JdbcUtils.HOST_LIST_KEY, + JdbcUtils.PORT_LIST_KEY, + config -> { + getDatabaseFromConfig(config).query(ctx -> ctx.fetch(String.format("DROP SCHEMA IF EXISTS %s CASCADE;", schemaName))); + }); + + // blow away the user at the end. + SshTunnel.sshWrap( + getConfig(), + JdbcUtils.HOST_LIST_KEY, + JdbcUtils.PORT_LIST_KEY, + config -> { + getDatabaseFromConfig(config).query(ctx -> ctx.fetch(String.format("DROP USER IF EXISTS %s;", USER_WITHOUT_CREDS))); + }); + } + +}