Skip to content

Commit

Permalink
🎉 Destination Oracle - Added support for connection via ssh tunnel (#…
Browse files Browse the repository at this point in the history
…6370)

* Oracle destination with ssh tunneling

* add ssh key integration testing for Oracle destination

* fix checkstyle

* add container approach to Oracle Destination Integration Tests

* add container approach to Oracle Destination Integration Tests

* add container approach to Oracle Source Integration Tests/ delete secrets for Oracle db

* Oracle Destination - bump version to 0.1.8

* remove oracle ssh secrets from ci_credentials.sh

Co-authored-by: vmaltsev <[email protected]>
  • Loading branch information
VitaliiMaltsev and vmaltsev authored Sep 28, 2021
1 parent 8fa1571 commit c256114
Show file tree
Hide file tree
Showing 17 changed files with 375 additions and 28 deletions.
2 changes: 0 additions & 2 deletions .github/workflows/publish-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,6 @@ jobs:
POSTGRES_SSH_PWD_TEST_CREDS: ${{ secrets.POSTGRES_SSH_PWD_TEST_CREDS }}
MYSQL_SSH_KEY_TEST_CREDS: ${{ secrets.MYSQL_SSH_KEY_TEST_CREDS }}
MYSQL_SSH_PWD_TEST_CREDS: ${{ secrets.MYSQL_SSH_PWD_TEST_CREDS }}
ORACLE_SSH_KEY_TEST_CREDS: ${{ secrets.ORACLE_SSH_KEY_TEST_CREDS }}
ORACLE_SSH_PWD_TEST_CREDS: ${{ secrets.ORACLE_SSH_PWD_TEST_CREDS }}
POSTHOG_TEST_CREDS: ${{ secrets.POSTHOG_TEST_CREDS }}
PIPEDRIVE_INTEGRATION_TESTS_CREDS: ${{ secrets.PIPEDRIVE_INTEGRATION_TESTS_CREDS }}
RECHARGE_INTEGRATION_TEST_CREDS: ${{ secrets.RECHARGE_INTEGRATION_TEST_CREDS }}
Expand Down
2 changes: 0 additions & 2 deletions .github/workflows/test-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,6 @@ jobs:
POSTGRES_SSH_PWD_TEST_CREDS: ${{ secrets.POSTGRES_SSH_PWD_TEST_CREDS }}
MYSQL_SSH_KEY_TEST_CREDS: ${{ secrets.MYSQL_SSH_KEY_TEST_CREDS }}
MYSQL_SSH_PWD_TEST_CREDS: ${{ secrets.MYSQL_SSH_PWD_TEST_CREDS }}
ORACLE_SSH_KEY_TEST_CREDS: ${{ secrets.ORACLE_SSH_KEY_TEST_CREDS }}
ORACLE_SSH_PWD_TEST_CREDS: ${{ secrets.ORACLE_SSH_PWD_TEST_CREDS }}
POSTHOG_TEST_CREDS: ${{ secrets.POSTHOG_TEST_CREDS }}
PIPEDRIVE_INTEGRATION_TESTS_CREDS: ${{ secrets.PIPEDRIVE_INTEGRATION_TESTS_CREDS }}
RECHARGE_INTEGRATION_TEST_CREDS: ${{ secrets.RECHARGE_INTEGRATION_TEST_CREDS }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "3986776d-2319-4de9-8af8-db14c0996e72",
"name": "Oracle (Alpha)",
"dockerRepository": "airbyte/destination-oracle",
"dockerImageTag": "0.1.7",
"dockerImageTag": "0.1.8",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/oracle"
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
- destinationDefinitionId: 3986776d-2319-4de9-8af8-db14c0996e72
name: Oracle (Alpha)
dockerRepository: airbyte/destination-oracle
dockerImageTag: 0.1.7
dockerImageTag: 0.1.8
documentationUrl: https://docs.airbyte.io/integrations/destinations/oracle
- destinationDefinitionId: 9f760101-60ae-462f-9ee6-b7a9dafd454d
name: Kafka
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.7
LABEL io.airbyte.version=0.1.8
LABEL io.airbyte.name=airbyte/destination-oracle
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ dependencies {
implementation "com.oracle.database.jdbc:ojdbc8-production:19.7.0.0"

testImplementation 'org.apache.commons:commons-lang3:3.11'
testImplementation 'org.testcontainers:oracle-xe:1.15.2'

integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test')
integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-oracle')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,17 @@
import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.base.ssh.SshWrappedDestination;
import io.airbyte.integrations.destination.jdbc.AbstractJdbcDestination;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OracleDestination extends AbstractJdbcDestination implements Destination {

private static final Logger LOGGER = LoggerFactory.getLogger(OracleDestination.class);
public static final List<String> HOST_KEY = List.of("host");
public static final List<String> PORT_KEY = List.of("port");

public static final String DRIVER_CLASS = "oracle.jdbc.OracleDriver";

Expand Down Expand Up @@ -46,7 +50,7 @@ public JsonNode toJdbcConfig(JsonNode config) {
}

public static void main(String[] args) throws Exception {
final Destination destination = new OracleDestination();
final Destination destination = new SshWrappedDestination(new OracleDestination(), HOST_KEY, PORT_KEY);
LOGGER.info("starting destination: {}", OracleDestination.class);
new IntegrationRunner(destination).run(args);
LOGGER.info("completed destination: {}", OracleDestination.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ public String convertStreamName(String input) {
if (!result.isEmpty() && result.charAt(0) == '_') {
result = result.substring(1);
}
return maxStringLength(result, 128);
// prior to Oracle version 12.2, identifiers are not allowed to exceed 30 characters in length.
// However, from version 12.2 they can be up to 128 bytes long. (Note: bytes, not characters).
return maxStringLength(result, 30);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ public void createSchemaIfNotExists(JdbcDatabase database, String schemaName) th
LOGGER.warn("Schema " + schemaName + " is not found! Trying to create a new one.");
final String query = String.format("create user %s identified by %s quota unlimited on %s",
schemaName, schemaName, tablespace);
// need to grant privileges to new user / this option is not mandatory for Oracle DB 18c or higher
final String privileges = String.format("GRANT ALL PRIVILEGES TO %s", schemaName);
database.execute(query);
database.execute(privileges);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.integrations.destination.oracle;

import io.airbyte.integrations.base.ssh.SshTunnel;

public class SshKeyOracleDestinationAcceptanceTest extends SshOracleDestinationAcceptanceTest {

@Override
public SshTunnel.TunnelMethod getTunnelMethod() {
return SshTunnel.TunnelMethod.SSH_KEY_AUTH;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.integrations.destination.oracle;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.functional.CheckedFunction;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Database;
import io.airbyte.db.Databases;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.base.ssh.SshBastionContainer;
import io.airbyte.integrations.base.ssh.SshTunnel;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.jooq.JSONFormat;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.OracleContainer;

public abstract class SshOracleDestinationAcceptanceTest extends DestinationAcceptanceTest {

private static final JSONFormat JSON_FORMAT = new JSONFormat().recordFormat(JSONFormat.RecordFormat.OBJECT);

private final ExtendedNameTransformer namingResolver = new OracleNameTransformer();

private final String schemaName = "TEST_ORCL";

private final SshBastionContainer sshBastionContainer = new SshBastionContainer();

private OracleContainer db;

public abstract SshTunnel.TunnelMethod getTunnelMethod();

@Override
protected String getImageName() {
return "airbyte/destination-oracle:dev";
}

@Override
protected JsonNode getConfig() throws IOException, InterruptedException {
return sshBastionContainer.getTunnelConfig(getTunnelMethod(), getBasicOracleDbConfigBuider(db).put("schema", schemaName));
}

public ImmutableMap.Builder<Object, Object> getBasicOracleDbConfigBuider(OracleContainer db) {
return ImmutableMap.builder()
.put("host", Objects.requireNonNull(db.getContainerInfo().getNetworkSettings()
.getNetworks()
.get(((Network.NetworkImpl) sshBastionContainer.getNetWork()).getName())
.getIpAddress()))
.put("username", db.getUsername())
.put("password", db.getPassword())
.put("port", db.getExposedPorts().get(0))
.put("sid", db.getSid())
.put("schemas", List.of("JDBC_SPACE"))
.put("ssl", false);
}

@Override
protected List<String> resolveIdentifier(String identifier) {
final List<String> result = new ArrayList<>();
final String resolved = namingResolver.getIdentifier(identifier);
result.add(identifier);
result.add(resolved);
if (!resolved.startsWith("\"")) {
result.add(resolved.toLowerCase());
result.add(resolved.toUpperCase());
}
return result;
}

@Override
protected JsonNode getFailCheckConfig() throws Exception {
final JsonNode clone = Jsons.clone(getConfig());
((ObjectNode) clone).put("password", "wrong password");
return clone;
}

@Override
protected List<JsonNode> retrieveRecords(TestDestinationEnv testEnv, String streamName, String namespace, JsonNode streamSchema) throws Exception {
List<JsonNode> jsonNodes = retrieveRecordsFromTable(namingResolver.getRawTableName(streamName), namespace);
return jsonNodes
.stream()
.map(r -> Jsons.deserialize(r.get(JavaBaseConstants.COLUMN_NAME_DATA.toUpperCase()).asText()))
.collect(Collectors.toList());
}

@Override
protected List<JsonNode> retrieveNormalizedRecords(TestDestinationEnv env, String streamName, String namespace)
throws Exception {
String tableName = namingResolver.getIdentifier(streamName);
return retrieveRecordsFromTable(tableName, namespace);
}

private List<JsonNode> retrieveRecordsFromTable(final String tableName, final String schemaName) throws Exception {
final JsonNode config = getConfig();
return SshTunnel.sshWrap(
config,
OracleDestination.HOST_KEY,
OracleDestination.PORT_KEY,
(CheckedFunction<JsonNode, List<JsonNode>, Exception>) mangledConfig -> getDatabaseFromConfig(mangledConfig)
.query(
ctx -> ctx
.fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC", schemaName, tableName, OracleDestination.COLUMN_NAME_EMITTED_AT)))
.stream()
.map(r -> r.formatJSON(JSON_FORMAT))
.map(Jsons::deserialize)
.collect(Collectors.toList()));
}

@Override
protected void setup(TestDestinationEnv testEnv) throws Exception {
startTestContainers();
SshTunnel.sshWrap(
getConfig(),
OracleDestination.HOST_KEY,
OracleDestination.PORT_KEY,
mangledConfig -> {
Database databaseFromConfig = getDatabaseFromConfig(mangledConfig);
databaseFromConfig.query(ctx -> ctx.fetch(String.format("CREATE USER %s IDENTIFIED BY %s", schemaName, schemaName)));
databaseFromConfig.query(ctx -> ctx.fetch(String.format("GRANT ALL PRIVILEGES TO %s", schemaName)));
});
}

private void startTestContainers() {
sshBastionContainer.initAndStartBastion();
initAndStartJdbcContainer();
}

private void initAndStartJdbcContainer() {
db = new OracleContainer("epiclabs/docker-oracle-xe-11g")
.withNetwork(sshBastionContainer.getNetWork());
db.start();
}

private Database getDatabaseFromConfig(final JsonNode config) {
return Databases.createDatabase(
config.get("username").asText(),
config.get("password").asText(),
String.format("jdbc:oracle:thin:@//%s:%s/%s",
config.get("host").asText(),
config.get("port").asText(),
config.get("sid").asText()),
"oracle.jdbc.driver.OracleDriver",
null);
}

@Override
protected void tearDown(TestDestinationEnv testEnv) throws Exception {
SshTunnel.sshWrap(
getConfig(),
OracleDestination.HOST_KEY,
OracleDestination.PORT_KEY,
mangledConfig -> {
Database databaseFromConfig = getDatabaseFromConfig(mangledConfig);
databaseFromConfig.query(ctx -> ctx.fetch(String.format("DROP USER %s CASCADE", schemaName)));
});

sshBastionContainer.stopAndCloseContainers(db);
}

@Override
protected boolean supportsDBT() {
return true;
}

@Override
protected boolean implementsNamespaces() {
return true;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.integrations.destination.oracle;

import io.airbyte.integrations.base.ssh.SshTunnel;

public class SshPasswordOracleDestinationAcceptanceTest extends SshOracleDestinationAcceptanceTest {

@Override
public SshTunnel.TunnelMethod getTunnelMethod() {
return SshTunnel.TunnelMethod.SSH_PASSWORD_AUTH;
}

}
Loading

0 comments on commit c256114

Please sign in to comment.