Skip to content

Commit

Permalink
πŸŽ‰ Destination Oracle: implemented connection encryption using N… (air…
Browse files Browse the repository at this point in the history
…bytehq#6893)

* airbytehq#6013 πŸŽ‰ Destination Oracle: implemented connection encryption using NNE and TLS
  • Loading branch information
alexandr-shegeda authored and schlattk committed Jan 4, 2022
1 parent c4d44bc commit 06fd641
Show file tree
Hide file tree
Showing 11 changed files with 482 additions and 184 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "3986776d-2319-4de9-8af8-db14c0996e72",
"name": "Oracle (Alpha)",
"dockerRepository": "airbyte/destination-oracle",
"dockerImageTag": "0.1.9",
"dockerImageTag": "0.1.10",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/oracle"
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
- destinationDefinitionId: 3986776d-2319-4de9-8af8-db14c0996e72
name: Oracle (Alpha)
dockerRepository: airbyte/destination-oracle
dockerImageTag: 0.1.9
dockerImageTag: 0.1.10
documentationUrl: https://docs.airbyte.io/integrations/destinations/oracle
- destinationDefinitionId: 9f760101-60ae-462f-9ee6-b7a9dafd454d
name: Kafka
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.9
LABEL io.airbyte.version=0.1.10
LABEL io.airbyte.name=airbyte/destination-oracle
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,12 @@
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.base.ssh.SshWrappedDestination;
import io.airbyte.integrations.destination.jdbc.AbstractJdbcDestination;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.RandomStringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -24,9 +29,20 @@ public class OracleDestination extends AbstractJdbcDestination implements Destin

public static final String DRIVER_CLASS = "oracle.jdbc.OracleDriver";

public static final String COLUMN_NAME_AB_ID = "\"" + JavaBaseConstants.COLUMN_NAME_AB_ID.toUpperCase() + "\"";
public static final String COLUMN_NAME_DATA = "\"" + JavaBaseConstants.COLUMN_NAME_DATA.toUpperCase() + "\"";
public static final String COLUMN_NAME_EMITTED_AT = "\"" + JavaBaseConstants.COLUMN_NAME_EMITTED_AT.toUpperCase() + "\"";
public static final String COLUMN_NAME_AB_ID =
"\"" + JavaBaseConstants.COLUMN_NAME_AB_ID.toUpperCase() + "\"";
public static final String COLUMN_NAME_DATA =
"\"" + JavaBaseConstants.COLUMN_NAME_DATA.toUpperCase() + "\"";
public static final String COLUMN_NAME_EMITTED_AT =
"\"" + JavaBaseConstants.COLUMN_NAME_EMITTED_AT.toUpperCase() + "\"";

private static final String KEY_STORE_FILE_PATH = "clientkeystore.jks";
private static final String KEY_STORE_PASS = RandomStringUtils.randomAlphanumeric(8);

enum Protocol {
TCP,
TCPS
}

public OracleDestination() {
super(DRIVER_CLASS, new OracleNameTransformer(), new OracleOperations("users"));
Expand All @@ -35,22 +51,85 @@ public OracleDestination() {

@Override
public JsonNode toJdbcConfig(JsonNode config) {
List<String> additionalParameters = new ArrayList<>();

Protocol protocol = config.has("encryption")
? obtainConnectionProtocol(config.get("encryption"), additionalParameters)
: Protocol.TCP;
final String connectionString = String.format(
"jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS=(PROTOCOL=%s)(HOST=%s)(PORT=%s))(CONNECT_DATA=(SID=%s)))",
protocol,
config.get("host").asText(),
config.get("port").asText(),
config.get("sid").asText());

final ImmutableMap.Builder<Object, Object> configBuilder = ImmutableMap.builder()
.put("username", config.get("username").asText())
.put("jdbc_url", String.format("jdbc:oracle:thin:@//%s:%s/%s",
config.get("host").asText(),
config.get("port").asText(),
config.get("sid").asText()));
.put("jdbc_url", connectionString);

if (config.has("password")) {
configBuilder.put("password", config.get("password").asText());
}

if (!additionalParameters.isEmpty()) {
String connectionParams = String.join(";", additionalParameters);
configBuilder.put("connection_properties", connectionParams);
}

return Jsons.jsonNode(configBuilder.build());
}

private Protocol obtainConnectionProtocol(JsonNode encryption,
List<String> additionalParameters) {
String encryptionMethod = encryption.get("encryption_method").asText();
switch (encryptionMethod) {
case "unencrypted" -> {
return Protocol.TCP;
}
case "client_nne" -> {
String algorithm = encryption.get("encryption_algorithm").asText();
additionalParameters.add("oracle.net.encryption_client=REQUIRED");
additionalParameters.add("oracle.net.encryption_types_client=( " + algorithm + " )");
return Protocol.TCP;
}
case "encrypted_verify_certificate" -> {
try {
convertAndImportCertificate(encryption.get("ssl_certificate").asText());
} catch (IOException | InterruptedException e) {
throw new RuntimeException("Failed to import certificate into Java Keystore");
}
additionalParameters.add("javax.net.ssl.trustStore=" + KEY_STORE_FILE_PATH);
additionalParameters.add("javax.net.ssl.trustStoreType=JKS");
additionalParameters.add("javax.net.ssl.trustStorePassword=" + KEY_STORE_PASS);
return Protocol.TCPS;
}
}
throw new RuntimeException(
"Failed to obtain connection protocol from config " + encryption.asText());
}

private static void convertAndImportCertificate(String certificate)
throws IOException, InterruptedException {
Runtime run = Runtime.getRuntime();
try (PrintWriter out = new PrintWriter("certificate.pem")) {
out.print(certificate);
}
runProcess("openssl x509 -outform der -in certificate.pem -out certificate.der", run);
runProcess("keytool -import -alias rds-root -keystore " + KEY_STORE_FILE_PATH
+ " -file certificate.der -storepass " + KEY_STORE_PASS + " -noprompt", run);
}

private static void runProcess(String cmd, Runtime run) throws IOException, InterruptedException {
Process pr = run.exec(cmd);
if (!pr.waitFor(30, TimeUnit.SECONDS)) {
pr.destroy();
throw new RuntimeException("Timeout while executing: " + cmd);
}
}

public static void main(String[] args) throws Exception {
final Destination destination = new SshWrappedDestination(new OracleDestination(), HOST_KEY, PORT_KEY);
final Destination destination = new SshWrappedDestination(new OracleDestination(), HOST_KEY,
PORT_KEY);
LOGGER.info("starting destination: {}", OracleDestination.class);
new IntegrationRunner(destination).run(args);
LOGGER.info("completed destination: {}", OracleDestination.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,92 @@
"title": "Default Schema",
"description": "The default schema tables are written to if the source does not specify a namespace. The usual value for this field is \"airbyte\". In Oracle, schemas and users are the same thing, so the \"user\" parameter is used as the login credentials and this is used for the default Airbyte message schema.",
"type": "string",
"examples": ["airbyte"],
"examples": [
"airbyte"
],
"default": "airbyte",
"order": 5
},
"encryption": {
"title": "Encryption",
"type": "object",
"description": "Encryption method to use when communicating with the database",
"order": 6,
"oneOf": [
{
"title": "Unencrypted",
"additionalProperties": false,
"description": "Data transfer will not be encrypted.",
"required": [
"encryption_method"
],
"properties": {
"encryption_method": {
"type": "string",
"const": "unencrypted",
"enum": [
"unencrypted"
],
"default": "unencrypted"
}
}
},
{
"title": "Native Network Ecryption (NNE)",
"additionalProperties": false,
"description": "Native network encryption gives you the ability to encrypt database connections, without the configuration overhead of TCP/IP and SSL/TLS and without the need to open and listen on different ports.",
"required": [
"encryption_method"
],
"properties": {
"encryption_method": {
"type": "string",
"const": "client_nne",
"enum": [
"client_nne"
],
"default": "client_nne"
},
"encryption_algorithm": {
"type": "string",
"description": "This parameter defines the encryption algorithm to be used",
"title": "Encryption Algorithm",
"default": "AES256",
"enum": [
"AES256",
"RC4_56",
"3DES168"
]
}
}
},
{
"title": "TLS Encrypted (verify certificate)",
"additionalProperties": false,
"description": "Verify and use the cert provided by the server.",
"required": [
"encryption_method",
"ssl_certificate"
],
"properties": {
"encryption_method": {
"type": "string",
"const": "encrypted_verify_certificate",
"enum": [
"encrypted_verify_certificate"
],
"default": "encrypted_verify_certificate"
},
"ssl_certificate": {
"title": "SSL PEM file",
"description": "Privacy Enhanced Mail (PEM) files are concatenated certificate containers frequently used in certificate installations",
"type": "string",
"airbyte_secret": true,
"multiline": true
}
}
}
]
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.oracle;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Databases;
import io.airbyte.db.jdbc.JdbcDatabase;
import java.sql.SQLException;
import java.util.List;
import java.util.stream.Collectors;
import org.junit.Test;

public class NneOracleDestinationAcceptanceTest extends UnencryptedOracleDestinationAcceptanceTest {

@Test
public void testEncryption() throws SQLException {
String algorithm = "AES256";

final JsonNode config = getConfig();
((ObjectNode) config).put("encryption", Jsons.jsonNode(ImmutableMap.builder()
.put("encryption_method", "client_nne")
.put("encryption_algorithm", algorithm)
.build()));

JdbcDatabase database = Databases.createJdbcDatabase(config.get("username").asText(),
config.get("password").asText(),
String.format("jdbc:oracle:thin:@//%s:%s/%s",
config.get("host").asText(),
config.get("port").asText(),
config.get("sid").asText()),
"oracle.jdbc.driver.OracleDriver",
"oracle.net.encryption_client=REQUIRED;" +
"oracle.net.encryption_types_client=( "
+ algorithm + " )");

String network_service_banner = "select network_service_banner from v$session_connect_info where sid in (select distinct sid from v$mystat)";
List<JsonNode> collect = database.query(network_service_banner).collect(Collectors.toList());

assertThat(collect.get(2).get("NETWORK_SERVICE_BANNER").asText(),
equals("Oracle Advanced Security: " + algorithm + " encryption"));
}

@Test
public void testCheckProtocol() throws SQLException {
final JsonNode clone = Jsons.clone(getConfig());
((ObjectNode) clone).put("encryption", Jsons.jsonNode(ImmutableMap.builder()
.put("encryption_method", "client_nne")
.put("encryption_algorithm", "AES256")
.build()));

String algorithm = clone.get("encryption")
.get("encryption_algorithm").asText();

JdbcDatabase database = Databases.createJdbcDatabase(clone.get("username").asText(),
clone.get("password").asText(),
String.format("jdbc:oracle:thin:@//%s:%s/%s",
clone.get("host").asText(),
clone.get("port").asText(),
clone.get("sid").asText()),
"oracle.jdbc.driver.OracleDriver",
"oracle.net.encryption_client=REQUIRED;" +
"oracle.net.encryption_types_client=( "
+ algorithm + " )");

String network_service_banner = "SELECT sys_context('USERENV', 'NETWORK_PROTOCOL') as network_protocol FROM dual";
List<JsonNode> collect = database.query(network_service_banner).collect(Collectors.toList());

assertEquals("tcp", collect.get(0).get("NETWORK_PROTOCOL").asText());
}
}
Loading

0 comments on commit 06fd641

Please sign in to comment.