Skip to content

Commit

Permalink
Redshift Destination: Add SSH Tunnelling Config Option
Browse files Browse the repository at this point in the history
Issue: #12131

User would like to be able to access their Redshift Destination database via an SSH Tunnelling config (SSH Bastion Host), just as in Postgres, etc. destinations.

Acceptance tests pass and exercise both Insert and S3 type loading options as well as password and key based authentication with the SSH bastion host.

Airbyters: Credentials/configs for acceptance testing against AWS Redshift are updated in Google Cloud Secret Manager. Be sure to get both the config.json and config_staging.json as well as the new PEM file
  • Loading branch information
jcowanpdx committed Feb 28, 2023
1 parent 33eed67 commit bd08340
Show file tree
Hide file tree
Showing 6 changed files with 275 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,16 @@
public class RedshiftDestination extends SwitchingDestination<RedshiftDestination.DestinationType> {

private static final Logger LOGGER = LoggerFactory.getLogger(RedshiftDestination.class);
private static final String METHOD = "method";

private static final Map<DestinationType, Destination> destinationMap = Map.of(
DestinationType.STANDARD, new RedshiftInsertDestination(),
DestinationType.COPY_S3, new RedshiftStagingS3Destination());

enum DestinationType {
STANDARD,
COPY_S3
}

private static final Map<DestinationType, Destination> destinationMap = Map.of(
DestinationType.STANDARD, RedshiftInsertDestination.sshWrappedDestination(),
DestinationType.COPY_S3, RedshiftStagingS3Destination.sshWrappedDestination());

public RedshiftDestination() {
super(DestinationType.class, RedshiftDestination::getTypeFromConfig, destinationMap);
}
Expand All @@ -47,7 +46,6 @@ private static DestinationType getTypeFromConfig(final JsonNode config) {
}

public static DestinationType determineUploadMode(final JsonNode config) {

final JsonNode jsonNode = findS3Options(config);

if (anyOfS3FieldsAreNullOrEmpty(jsonNode)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import io.airbyte.db.jdbc.DefaultJdbcDatabase;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.base.ssh.SshWrappedDestination;
import io.airbyte.integrations.destination.jdbc.AbstractJdbcDestination;
import io.airbyte.integrations.destination.redshift.operations.RedshiftSqlOperations;
import java.util.Map;
Expand All @@ -26,6 +28,10 @@ public class RedshiftInsertDestination extends AbstractJdbcDestination {
JdbcUtils.SSL_KEY, "true",
"sslfactory", "com.amazon.redshift.ssl.NonValidatingFactory");

public static Destination sshWrappedDestination() {
return new SshWrappedDestination(new RedshiftInsertDestination(), JdbcUtils.HOST_LIST_KEY, JdbcUtils.PORT_LIST_KEY);
}

public RedshiftInsertDestination() {
super(DRIVER_CLASS, new RedshiftSQLNameTransformer(), new RedshiftSqlOperations());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.base.AirbyteTraceMessageUtility;
import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.base.ssh.SshWrappedDestination;
import io.airbyte.integrations.destination.NamingConventionTransformer;
import io.airbyte.integrations.destination.jdbc.AbstractJdbcDestination;
import io.airbyte.integrations.destination.record_buffer.FileBuffer;
Expand Down Expand Up @@ -50,6 +51,10 @@ public class RedshiftStagingS3Destination extends AbstractJdbcDestination implem

private static final Logger LOGGER = LoggerFactory.getLogger(RedshiftStagingS3Destination.class);

public static Destination sshWrappedDestination() {
return new SshWrappedDestination(new RedshiftStagingS3Destination(), JdbcUtils.HOST_LIST_KEY, JdbcUtils.PORT_LIST_KEY);
}

public RedshiftStagingS3Destination() {
super(RedshiftInsertDestination.DRIVER_CLASS, new RedshiftSQLNameTransformer(), new RedshiftSqlOperations());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package io.airbyte.integrations.destination.redshift;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.ssh.SshTunnel.TunnelMethod;
import java.io.IOException;
import java.nio.file.Path;

/*
* SshKeyRedshiftInsertDestinationAcceptanceTest runs basic Redshift Destination Tests using the SQL Insert
* mechanism for upload of data and "key" authentication for the SSH bastion configuration.
*
* Check Google Secrets for a new secret config called "SECRET_DESTINATION-REDSHIFT-BASTION__KEY". Place the contents
* in your secrets directory under 'redshift-bastion-key-pair.pem'
*/
public class SshKeyRedshiftInsertDestinationAcceptanceTest extends SshRedshiftDestinationBaseAcceptanceTest {

@Override
public TunnelMethod getTunnelMethod() {
return TunnelMethod.SSH_KEY_AUTH;
}

public JsonNode getStaticConfig() throws IOException {
final Path configPath = Path.of("secrets/config.json");
final String configAsString = IOs.readFile(configPath);
return Jsons.deserialize(configAsString);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package io.airbyte.integrations.destination.redshift;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.ssh.SshTunnel.TunnelMethod;
import java.io.IOException;
import java.nio.file.Path;

/*
* SshPasswordRedshiftStagingDestinationAcceptanceTest runs basic Redshift Destination Tests using the S3 Staging
* mechanism for upload of data and "password" authentication for the SSH bastion configuration.
*/
public class SshPasswordRedshiftStagingDestinationAcceptanceTest extends SshRedshiftDestinationBaseAcceptanceTest {

@Override
public TunnelMethod getTunnelMethod() {
return TunnelMethod.SSH_PASSWORD_AUTH;
}

@Override
public JsonNode getStaticConfig() throws IOException {
final Path configPath = Path.of("secrets/config_staging.json");
final String configAsString = IOs.readFile(configPath);
return Jsons.deserialize(configAsString);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
package io.airbyte.integrations.destination.redshift;

import static io.airbyte.integrations.base.ssh.SshTunnel.TunnelMethod.SSH_KEY_AUTH;
import static io.airbyte.integrations.base.ssh.SshTunnel.TunnelMethod.SSH_PASSWORD_AUTH;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMap.Builder;
import io.airbyte.commons.jackson.MoreMappers;
import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator;
import java.util.Map;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.string.Strings;
import io.airbyte.db.Database;
import io.airbyte.db.factory.DSLContextFactory;
import io.airbyte.db.factory.DatabaseDriver;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.base.ssh.SshTunnel;
import io.airbyte.integrations.destination.redshift.operations.RedshiftSqlOperations;
import io.airbyte.integrations.standardtest.destination.JdbcDestinationAcceptanceTest;
import java.io.IOException;
import java.nio.file.Path;
import java.util.List;
import java.util.stream.Collectors;

public abstract class SshRedshiftDestinationBaseAcceptanceTest extends JdbcDestinationAcceptanceTest {

protected String schemaName;
// config from which to create / delete schemas.
protected JsonNode baseConfig;
// config which refers to the schema that the test is being run in.
protected JsonNode config;

private final RedshiftSQLNameTransformer namingResolver = new RedshiftSQLNameTransformer();
private final String USER_WITHOUT_CREDS = Strings.addRandomSuffix("test_user", "_", 5);

public abstract SshTunnel.TunnelMethod getTunnelMethod();

@Override
protected String getImageName() {
return "airbyte/destination-redshift:dev";
}

@Override
protected JsonNode getConfig() throws Exception {
final Map<Object, Object> configAsMap = deserializeToObjectMap(config);
final Builder<Object, Object> configMapBuilder = new Builder<>().putAll(configAsMap);
return getTunnelConfig(getTunnelMethod(), configMapBuilder);
}

protected JsonNode getTunnelConfig(final SshTunnel.TunnelMethod tunnelMethod, final ImmutableMap.Builder<Object, Object> builderWithSchema) {
final Path keyPath = Path.of("secrets/redshift-bastion-key-pair.pem");
final String keyAsString = IOs.readFile(keyPath);

final JsonNode sshBastionHost = config.get("ssh_bastion_host");
final JsonNode sshBastionPort = config.get("ssh_bastion_port");
final JsonNode sshBastionUser = config.get("ssh_bastion_user");
final JsonNode sshBastionPassword = config.get("ssh_bastion_password");

final String tunnelUserPassword = tunnelMethod.equals(SSH_PASSWORD_AUTH) ? sshBastionPassword.asText() : "";
final String sshKey = tunnelMethod.equals(SSH_KEY_AUTH) ? keyAsString : "";

return Jsons.jsonNode(builderWithSchema
.put("tunnel_method", Jsons.jsonNode(ImmutableMap.builder()
.put("tunnel_host", sshBastionHost)
.put("tunnel_method", tunnelMethod.toString())
.put("tunnel_port", sshBastionPort.intValue())
.put("tunnel_user", sshBastionUser)
.put("tunnel_user_password", tunnelUserPassword)
.put("ssh_key", sshKey)
.build()))
.build());
}

public static Map<Object, Object> deserializeToObjectMap(final JsonNode json) {
final ObjectMapper objectMapper = MoreMappers.initMapper();
return objectMapper.convertValue(json, new TypeReference<>() {
});
}

public abstract JsonNode getStaticConfig() throws IOException;

@Override
protected JsonNode getFailCheckConfig() {
final JsonNode invalidConfig = Jsons.clone(config);
((ObjectNode) invalidConfig).put("password", "wrong password");
return invalidConfig;
}

@Override
protected boolean implementsNamespaces() {
return true;
}

@Override
protected List<JsonNode> retrieveNormalizedRecords(final TestDestinationEnv env, final String streamName, final String namespace)
throws Exception {
final String tableName = namingResolver.getIdentifier(streamName);
return retrieveRecordsFromTable(tableName, namespace);
}

@Override
protected List<JsonNode> retrieveRecords(final TestDestinationEnv testEnv, final String streamName, final String namespace,
final JsonNode streamSchema) throws Exception {
return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName), namespace)
.stream()
.map(j -> j.get(JavaBaseConstants.COLUMN_NAME_DATA))
.collect(Collectors.toList());
}

private List<JsonNode> retrieveRecordsFromTable(final String tableName, final String schemaName) throws Exception {
return SshTunnel.sshWrap(
getConfig(),
JdbcUtils.HOST_LIST_KEY,
JdbcUtils.PORT_LIST_KEY,
config -> {
return getDatabaseFromConfig(config).query(ctx -> ctx
.fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT))
.stream()
.map(this::getJsonFromRecord)
.collect(Collectors.toList()));
});

}

@Override
protected TestDataComparator getTestDataComparator() {
return new RedshiftTestDataComparator();
}

private static Database getDatabaseFromConfig(final JsonNode config) {
return new Database(
DSLContextFactory.create(
config.get(JdbcUtils.USERNAME_KEY).asText(),
config.get(JdbcUtils.PASSWORD_KEY).asText(),
DatabaseDriver.REDSHIFT.getDriverClassName(),
String.format(DatabaseDriver.REDSHIFT.getUrlFormatString(),
config.get(JdbcUtils.HOST_KEY).asText(),
config.get(JdbcUtils.PORT_KEY).asInt(),
config.get(JdbcUtils.DATABASE_KEY).asText()),
null,
RedshiftInsertDestination.SSL_JDBC_PARAMETERS));
}

@Override
protected int getMaxRecordValueLimit() {
return RedshiftSqlOperations.REDSHIFT_VARCHAR_MAX_BYTE_SIZE;
}

@Override
protected void setup(final TestDestinationEnv testEnv) throws Exception {
baseConfig = getStaticConfig();
final JsonNode configForSchema = Jsons.clone(baseConfig);
schemaName = Strings.addRandomSuffix("integration_test", "_", 5);
((ObjectNode) configForSchema).put("schema", schemaName);
config = configForSchema;

// create the schema
SshTunnel.sshWrap(
getConfig(),
JdbcUtils.HOST_LIST_KEY,
JdbcUtils.PORT_LIST_KEY,
config -> {
getDatabaseFromConfig(config).query(ctx -> ctx.fetch(String.format("CREATE SCHEMA %s;", schemaName)));
});

// create the user
SshTunnel.sshWrap(
getConfig(),
JdbcUtils.HOST_LIST_KEY,
JdbcUtils.PORT_LIST_KEY,
config -> {
getDatabaseFromConfig(config).query(ctx -> ctx.fetch(String.format("CREATE USER %s WITH PASSWORD '%s';",
USER_WITHOUT_CREDS, baseConfig.get("password").asText())));
});
}

@Override
protected void tearDown(final TestDestinationEnv testEnv) throws Exception {
// blow away the test schema at the end.
SshTunnel.sshWrap(
getConfig(),
JdbcUtils.HOST_LIST_KEY,
JdbcUtils.PORT_LIST_KEY,
config -> {
getDatabaseFromConfig(config).query(ctx -> ctx.fetch(String.format("DROP SCHEMA IF EXISTS %s CASCADE;", schemaName)));
});

// blow away the user at the end.
SshTunnel.sshWrap(
getConfig(),
JdbcUtils.HOST_LIST_KEY,
JdbcUtils.PORT_LIST_KEY,
config -> {
getDatabaseFromConfig(config).query(ctx -> ctx.fetch(String.format("DROP USER IF EXISTS %s;", USER_WITHOUT_CREDS)));
});
}

}

0 comments on commit bd08340

Please sign in to comment.