Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redshift Destination: Add SSH Tunnelling Config Option #23523

Merged
merged 20 commits into from
Mar 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
62e7194
Redshift Destination: Add SSH Tunnelling Config Option
jcowanpdx Feb 27, 2023
6cd1cf5
Merge branch 'master' into jeff/add-ssh-tunnel-to-redshift-destination
jcowanpdx Feb 28, 2023
bd08340
Redshift Destination: Add SSH Tunnelling Config Option
jcowanpdx Feb 27, 2023
6463556
Merge branch 'jeff/add-ssh-tunnel-to-redshift-destination' of github.…
jcowanpdx Feb 28, 2023
e7673ac
README.md updates
jcowanpdx Feb 28, 2023
6c82cbe
Move the pem info into the secrets config files.
jcowanpdx Feb 28, 2023
360e473
bump to 0.4.0
jcowanpdx Feb 28, 2023
bbd94a7
Merge branch 'master' into jeff/add-ssh-tunnel-to-redshift-destination
jcowanpdx Feb 28, 2023
c59f0aa
Merge branch 'master' into jeff/add-ssh-tunnel-to-redshift-destination
jcowanpdx Mar 2, 2023
0c933ec
adding a note about S3 staging NOT using the SSH Tunnel configuration…
jcowanpdx Mar 2, 2023
862ec5d
Merge branch 'jeff/add-ssh-tunnel-to-redshift-destination' of github.…
jcowanpdx Mar 2, 2023
b684cd0
Merge branch 'master' into jeff/add-ssh-tunnel-to-redshift-destination
jcowanpdx Mar 2, 2023
c527595
Merge branch 'master' into jeff/add-ssh-tunnel-to-redshift-destination
jcowanpdx Mar 3, 2023
919408e
Merge branch 'master' into jeff/add-ssh-tunnel-to-redshift-destination
jcowanpdx Mar 3, 2023
c915214
limit idle connection time for redshift users
jcowanpdx Mar 6, 2023
dbaf59e
make base class abstract to prevent double running the same tests
jcowanpdx Mar 6, 2023
acc8cee
auto-bump connector version
octavia-squidington-iii Mar 6, 2023
b436e01
Merge branch 'master' into jeff/add-ssh-tunnel-to-redshift-destination
jcowanpdx Mar 7, 2023
b1addc6
Merge branch 'master' into jeff/add-ssh-tunnel-to-redshift-destination
jcowanpdx Mar 7, 2023
2253606
Merge branch 'master' into jeff/add-ssh-tunnel-to-redshift-destination
jcowanpdx Mar 7, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@
- name: Redshift
destinationDefinitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
dockerRepository: airbyte/destination-redshift
dockerImageTag: 0.3.56
dockerImageTag: 0.4.0
documentationUrl: https://docs.airbyte.com/integrations/destinations/redshift
icon: redshift.svg
normalizationConfig:
Expand Down
103 changes: 102 additions & 1 deletion airbyte-config/init/src/main/resources/seed/destination_specs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5055,7 +5055,7 @@
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-redshift:0.3.56"
- dockerImage: "airbyte/destination-redshift:0.4.0"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/destinations/redshift"
connectionSpecification:
Expand Down Expand Up @@ -5282,6 +5282,107 @@
\ has deteriorating effects"
examples:
- "10"
tunnel_method:
type: "object"
title: "SSH Tunnel Method"
description: "Whether to initiate an SSH tunnel before connecting to the\
\ database, and if so, which kind of authentication to use."
oneOf:
- title: "No Tunnel"
required:
- "tunnel_method"
properties:
tunnel_method:
description: "No ssh tunnel needed to connect to database"
type: "string"
const: "NO_TUNNEL"
order: 0
- title: "SSH Key Authentication"
required:
- "tunnel_method"
- "tunnel_host"
- "tunnel_port"
- "tunnel_user"
- "ssh_key"
properties:
tunnel_method:
description: "Connect through a jump server tunnel host using username\
\ and ssh key"
type: "string"
const: "SSH_KEY_AUTH"
order: 0
tunnel_host:
title: "SSH Tunnel Jump Server Host"
description: "Hostname of the jump server host that allows inbound\
\ ssh tunnel."
type: "string"
order: 1
tunnel_port:
title: "SSH Connection Port"
description: "Port on the proxy/jump server that accepts inbound ssh\
\ connections."
type: "integer"
minimum: 0
maximum: 65536
default: 22
examples:
- "22"
order: 2
tunnel_user:
title: "SSH Login Username"
description: "OS-level username for logging into the jump server host."
type: "string"
order: 3
ssh_key:
title: "SSH Private Key"
description: "OS-level user account ssh key credentials in RSA PEM\
\ format ( created with ssh-keygen -t rsa -m PEM -f myuser_rsa )"
type: "string"
airbyte_secret: true
multiline: true
order: 4
- title: "Password Authentication"
required:
- "tunnel_method"
- "tunnel_host"
- "tunnel_port"
- "tunnel_user"
- "tunnel_user_password"
properties:
tunnel_method:
description: "Connect through a jump server tunnel host using username\
\ and password authentication"
type: "string"
const: "SSH_PASSWORD_AUTH"
order: 0
tunnel_host:
title: "SSH Tunnel Jump Server Host"
description: "Hostname of the jump server host that allows inbound\
\ ssh tunnel."
type: "string"
order: 1
tunnel_port:
title: "SSH Connection Port"
description: "Port on the proxy/jump server that accepts inbound ssh\
\ connections."
type: "integer"
minimum: 0
maximum: 65536
default: 22
examples:
- "22"
order: 2
tunnel_user:
title: "SSH Login Username"
description: "OS-level username for logging into the jump server host"
type: "string"
order: 3
tunnel_user_password:
title: "Password"
description: "OS-level password for logging into the jump server host"
type: "string"
airbyte_secret: true
order: 4
supportsIncremental: true
supportsNormalization: true
supportsDBT: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-redshift

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.3.56
LABEL io.airbyte.version=0.4.0
LABEL io.airbyte.name=airbyte/destination-redshift
34 changes: 18 additions & 16 deletions airbyte-integrations/connectors/destination-redshift/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,25 @@

This is the repository for the Redshift Destination Connector.

This connector can run in one of two mode:

- Direct Insert: using SQL to directly place data into tables.
- S3 Staging: Data files are uploaded to the customer's S3 and a load is done into the database from these files directly. This is a directly
supported feature of Redshift. Consult Redshift documentation for more information and permissions.

This connector has a capability to query the database via an SSH Tunnel (bastion host). This can be useful for environments where Redshift has not
been exposed to the internet.

## Testing
The `AirbyteCopier.java` class contains an ignored local test class. This is ignored as
the necessary components cannot be mocked locally. The class requires credentials in order
to run all its tests.

Users have the option of either filling in the variables defined at the top of the class,
or filling in a `config.properties` file in `src/test/resources` with the following property keys:
```aidl
s3.keyId=<key_id>
s3.accessKey=<access_key>

redshift.connString=<conn_string>
redshift.user=<user>
redshift.pass=<pass>
```

Unit tests are run as usual.

Integration/Acceptance tests are run via the command line with secrets managed out of Google Cloud Secrets Manager.
Consult the integration test area for Redshift.

## Actual secrets
The actual secrets for integration tests could be found in Google Secrets Manager. It could be found by next labels:

The actual secrets for integration tests can be found in Google Cloud Secrets Manager. Search on redshift for the labels:

- SECRET_DESTINATION-REDSHIFT__CREDS - used for Standard tests. (__config.json__)
- SECRET_DESTINATION-REDSHIFT_STAGING__CREDS - used for S3 Staging tests. (__config_staging.json__)

Original file line number Diff line number Diff line change
Expand Up @@ -8,36 +8,37 @@
import static io.airbyte.integrations.destination.redshift.util.RedshiftUtil.findS3Options;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.destination.jdbc.copy.SwitchingDestination;
import io.airbyte.protocol.models.v0.ConnectorSpecification;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* The Redshift Destination offers two replication strategies. The first inserts via a typical SQL
* Insert statement. Although less efficient, this requires less user set up. See
* {@link RedshiftInsertDestination} for more detail. The second inserts via streaming the data to
* an S3 bucket, and Cop-ing the date into Redshift. This is more efficient, and recommended for
* production workloads, but does require users to set up an S3 bucket and pass in additional
* credentials. See {@link RedshiftStagingS3Destination} for more detail. This class inspect the
* given arguments to determine which strategy to use.
* The Redshift Destination offers two replication strategies. The first inserts via a typical SQL Insert statement. Although less efficient, this
* requires less user set up. See {@link RedshiftInsertDestination} for more detail. The second inserts via streaming the data to an S3 bucket, and
* Cop-ing the date into Redshift. This is more efficient, and recommended for production workloads, but does require users to set up an S3 bucket and
* pass in additional credentials. See {@link RedshiftStagingS3Destination} for more detail. This class inspect the given arguments to determine which
* strategy to use.
*/
public class RedshiftDestination extends SwitchingDestination<RedshiftDestination.DestinationType> {

private static final Logger LOGGER = LoggerFactory.getLogger(RedshiftDestination.class);
private static final String METHOD = "method";

private static final Map<DestinationType, Destination> destinationMap = Map.of(
DestinationType.STANDARD, new RedshiftInsertDestination(),
DestinationType.COPY_S3, new RedshiftStagingS3Destination());

enum DestinationType {
STANDARD,
COPY_S3
}

private static final Map<DestinationType, Destination> destinationMap = Map.of(
DestinationType.STANDARD, RedshiftInsertDestination.sshWrappedDestination(),
DestinationType.COPY_S3, RedshiftStagingS3Destination.sshWrappedDestination());

public RedshiftDestination() {
super(DestinationType.class, RedshiftDestination::getTypeFromConfig, destinationMap);
}
Expand All @@ -47,7 +48,6 @@ private static DestinationType getTypeFromConfig(final JsonNode config) {
}

public static DestinationType determineUploadMode(final JsonNode config) {

final JsonNode jsonNode = findS3Options(config);

if (anyOfS3FieldsAreNullOrEmpty(jsonNode)) {
Expand All @@ -58,6 +58,15 @@ public static DestinationType determineUploadMode(final JsonNode config) {
return DestinationType.COPY_S3;
}

@Override
public ConnectorSpecification spec() throws Exception {
edgao marked this conversation as resolved.
Show resolved Hide resolved
// inject the standard ssh configuration into the spec.
final ConnectorSpecification originalSpec = super.spec();
final ObjectNode propNode = (ObjectNode) originalSpec.getConnectionSpecification().get("properties");
propNode.set("tunnel_method", Jsons.deserialize(MoreResources.readResource("ssh-tunnel-spec.json")));
return originalSpec;
}

public static void main(final String[] args) throws Exception {
final Destination destination = new RedshiftDestination();
LOGGER.info("starting destination: {}", RedshiftDestination.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import io.airbyte.db.jdbc.DefaultJdbcDatabase;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.base.ssh.SshWrappedDestination;
import io.airbyte.integrations.destination.jdbc.AbstractJdbcDestination;
import io.airbyte.integrations.destination.redshift.operations.RedshiftSqlOperations;
import java.util.Map;
Expand All @@ -26,6 +28,10 @@ public class RedshiftInsertDestination extends AbstractJdbcDestination {
JdbcUtils.SSL_KEY, "true",
"sslfactory", "com.amazon.redshift.ssl.NonValidatingFactory");

public static Destination sshWrappedDestination() {
return new SshWrappedDestination(new RedshiftInsertDestination(), JdbcUtils.HOST_LIST_KEY, JdbcUtils.PORT_LIST_KEY);
}

public RedshiftInsertDestination() {
super(DRIVER_CLASS, new RedshiftSQLNameTransformer(), new RedshiftSqlOperations());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.base.AirbyteTraceMessageUtility;
import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.base.ssh.SshWrappedDestination;
import io.airbyte.integrations.destination.NamingConventionTransformer;
import io.airbyte.integrations.destination.jdbc.AbstractJdbcDestination;
import io.airbyte.integrations.destination.record_buffer.FileBuffer;
Expand Down Expand Up @@ -50,6 +51,10 @@ public class RedshiftStagingS3Destination extends AbstractJdbcDestination implem

private static final Logger LOGGER = LoggerFactory.getLogger(RedshiftStagingS3Destination.class);

public static Destination sshWrappedDestination() {
return new SshWrappedDestination(new RedshiftStagingS3Destination(), JdbcUtils.HOST_LIST_KEY, JdbcUtils.PORT_LIST_KEY);
}

public RedshiftStagingS3Destination() {
super(RedshiftInsertDestination.DRIVER_CLASS, new RedshiftSQLNameTransformer(), new RedshiftSqlOperations());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
* Integration test testing {@link RedshiftStagingS3Destination}. The default Redshift integration
* test credentials contain S3 credentials - this automatically causes COPY to be selected.
*/
public class RedshiftStagingS3DestinationAcceptanceTest extends JdbcDestinationAcceptanceTest {
public abstract class RedshiftStagingS3DestinationAcceptanceTest extends JdbcDestinationAcceptanceTest {

private static final Logger LOGGER = LoggerFactory.getLogger(RedshiftStagingS3DestinationAcceptanceTest.class);

Expand Down Expand Up @@ -204,7 +204,7 @@ protected void setup(final TestDestinationEnv testEnv) throws Exception {
final String createSchemaQuery = String.format("CREATE SCHEMA %s", schemaName);
baseConfig = getStaticConfig();
getDatabase().query(ctx -> ctx.execute(createSchemaQuery));
final String createUser = String.format("create user %s with password '%s';",
final String createUser = String.format("create user %s with password '%s' SESSION TIMEOUT 60;",
USER_WITHOUT_CREDS, baseConfig.get("password").asText());
getDatabase().query(ctx -> ctx.execute(createUser));
final JsonNode configForSchema = Jsons.clone(baseConfig);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package io.airbyte.integrations.destination.redshift;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.ssh.SshTunnel.TunnelMethod;
import java.io.IOException;
import java.nio.file.Path;

/*
* SshKeyRedshiftInsertDestinationAcceptanceTest runs basic Redshift Destination Tests using the SQL Insert
* mechanism for upload of data and "key" authentication for the SSH bastion configuration.
*/
public class SshKeyRedshiftInsertDestinationAcceptanceTest extends SshRedshiftDestinationBaseAcceptanceTest {

@Override
public TunnelMethod getTunnelMethod() {
return TunnelMethod.SSH_KEY_AUTH;
}

public JsonNode getStaticConfig() throws IOException {
final Path configPath = Path.of("secrets/config.json");
final String configAsString = IOs.readFile(configPath);
return Jsons.deserialize(configAsString);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package io.airbyte.integrations.destination.redshift;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.ssh.SshTunnel.TunnelMethod;
import java.io.IOException;
import java.nio.file.Path;

/*
* SshPasswordRedshiftStagingDestinationAcceptanceTest runs basic Redshift Destination Tests using the S3 Staging
* mechanism for upload of data and "password" authentication for the SSH bastion configuration.
*/
public class SshPasswordRedshiftStagingDestinationAcceptanceTest extends SshRedshiftDestinationBaseAcceptanceTest {

@Override
public TunnelMethod getTunnelMethod() {
return TunnelMethod.SSH_PASSWORD_AUTH;
}

@Override
public JsonNode getStaticConfig() throws IOException {
final Path configPath = Path.of("secrets/config_staging.json");
final String configAsString = IOs.readFile(configPath);
return Jsons.deserialize(configAsString);
}
}
Loading