From 06fd6412a98f44378e871726002111998bd5db16 Mon Sep 17 00:00:00 2001 From: Oleksandr Sheheda Date: Tue, 12 Oct 2021 18:20:09 +0300 Subject: [PATCH] =?UTF-8?q?=F0=9F=8E=89=20Destination=20Oracle:=20implemen?= =?UTF-8?q?ted=20connection=20encryption=20using=20N=E2=80=A6=20(#6893)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * #6013 🎉 Destination Oracle: implemented connection encryption using NNE and TLS --- .../3986776d-2319-4de9-8af8-db14c0996e72.json | 2 +- .../seed/destination_definitions.yaml | 2 +- .../connectors/destination-oracle/Dockerfile | 2 +- .../destination/oracle/OracleDestination.java | 95 ++++++++- .../src/main/resources/spec.json | 85 +++++++- .../NneOracleDestinationAcceptanceTest.java | 78 ++++++++ .../oracle/OracleIntegrationTest.java | 160 --------------- .../SshOracleDestinationAcceptanceTest.java | 9 +- ...ryptedOracleDestinationAcceptanceTest.java | 182 ++++++++++++++++++ .../source/oracle/OracleSource.java | 3 +- docs/integrations/destinations/oracle.md | 48 ++++- 11 files changed, 482 insertions(+), 184 deletions(-) create mode 100644 airbyte-integrations/connectors/destination-oracle/src/test-integration/java/io/airbyte/integrations/destination/oracle/NneOracleDestinationAcceptanceTest.java delete mode 100644 airbyte-integrations/connectors/destination-oracle/src/test-integration/java/io/airbyte/integrations/destination/oracle/OracleIntegrationTest.java create mode 100644 airbyte-integrations/connectors/destination-oracle/src/test-integration/java/io/airbyte/integrations/destination/oracle/UnencryptedOracleDestinationAcceptanceTest.java diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/3986776d-2319-4de9-8af8-db14c0996e72.json b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/3986776d-2319-4de9-8af8-db14c0996e72.json index 58d3c80ef49a..a3a8e24b81f2 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/3986776d-2319-4de9-8af8-db14c0996e72.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/3986776d-2319-4de9-8af8-db14c0996e72.json @@ -2,6 +2,6 @@ "destinationDefinitionId": "3986776d-2319-4de9-8af8-db14c0996e72", "name": "Oracle (Alpha)", "dockerRepository": "airbyte/destination-oracle", - "dockerImageTag": "0.1.9", + "dockerImageTag": "0.1.10", "documentationUrl": "https://docs.airbyte.io/integrations/destinations/oracle" } diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index 5a6ff7920a8f..0639655780f1 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -73,7 +73,7 @@ - destinationDefinitionId: 3986776d-2319-4de9-8af8-db14c0996e72 name: Oracle (Alpha) dockerRepository: airbyte/destination-oracle - dockerImageTag: 0.1.9 + dockerImageTag: 0.1.10 documentationUrl: https://docs.airbyte.io/integrations/destinations/oracle - destinationDefinitionId: 9f760101-60ae-462f-9ee6-b7a9dafd454d name: Kafka diff --git a/airbyte-integrations/connectors/destination-oracle/Dockerfile b/airbyte-integrations/connectors/destination-oracle/Dockerfile index 0f02b2e2caa3..31c2b83246c1 100644 --- a/airbyte-integrations/connectors/destination-oracle/Dockerfile +++ b/airbyte-integrations/connectors/destination-oracle/Dockerfile @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.1.9 +LABEL io.airbyte.version=0.1.10 LABEL io.airbyte.name=airbyte/destination-oracle diff --git a/airbyte-integrations/connectors/destination-oracle/src/main/java/io/airbyte/integrations/destination/oracle/OracleDestination.java b/airbyte-integrations/connectors/destination-oracle/src/main/java/io/airbyte/integrations/destination/oracle/OracleDestination.java index a50a96ec530e..78dac999ee59 100644 --- a/airbyte-integrations/connectors/destination-oracle/src/main/java/io/airbyte/integrations/destination/oracle/OracleDestination.java +++ b/airbyte-integrations/connectors/destination-oracle/src/main/java/io/airbyte/integrations/destination/oracle/OracleDestination.java @@ -12,7 +12,12 @@ import io.airbyte.integrations.base.JavaBaseConstants; import io.airbyte.integrations.base.ssh.SshWrappedDestination; import io.airbyte.integrations.destination.jdbc.AbstractJdbcDestination; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.ArrayList; import java.util.List; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.RandomStringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -24,9 +29,20 @@ public class OracleDestination extends AbstractJdbcDestination implements Destin public static final String DRIVER_CLASS = "oracle.jdbc.OracleDriver"; - public static final String COLUMN_NAME_AB_ID = "\"" + JavaBaseConstants.COLUMN_NAME_AB_ID.toUpperCase() + "\""; - public static final String COLUMN_NAME_DATA = "\"" + JavaBaseConstants.COLUMN_NAME_DATA.toUpperCase() + "\""; - public static final String COLUMN_NAME_EMITTED_AT = "\"" + JavaBaseConstants.COLUMN_NAME_EMITTED_AT.toUpperCase() + "\""; + public static final String COLUMN_NAME_AB_ID = + "\"" + JavaBaseConstants.COLUMN_NAME_AB_ID.toUpperCase() + "\""; + public static final String COLUMN_NAME_DATA = + "\"" + JavaBaseConstants.COLUMN_NAME_DATA.toUpperCase() + "\""; + public static final String COLUMN_NAME_EMITTED_AT = + "\"" + JavaBaseConstants.COLUMN_NAME_EMITTED_AT.toUpperCase() + "\""; + + private static final String KEY_STORE_FILE_PATH = "clientkeystore.jks"; + private static final String KEY_STORE_PASS = RandomStringUtils.randomAlphanumeric(8); + + enum Protocol { + TCP, + TCPS + } public OracleDestination() { super(DRIVER_CLASS, new OracleNameTransformer(), new OracleOperations("users")); @@ -35,22 +51,85 @@ public OracleDestination() { @Override public JsonNode toJdbcConfig(JsonNode config) { + List additionalParameters = new ArrayList<>(); + + Protocol protocol = config.has("encryption") + ? obtainConnectionProtocol(config.get("encryption"), additionalParameters) + : Protocol.TCP; + final String connectionString = String.format( + "jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS=(PROTOCOL=%s)(HOST=%s)(PORT=%s))(CONNECT_DATA=(SID=%s)))", + protocol, + config.get("host").asText(), + config.get("port").asText(), + config.get("sid").asText()); + final ImmutableMap.Builder configBuilder = ImmutableMap.builder() .put("username", config.get("username").asText()) - .put("jdbc_url", String.format("jdbc:oracle:thin:@//%s:%s/%s", - config.get("host").asText(), - config.get("port").asText(), - config.get("sid").asText())); + .put("jdbc_url", connectionString); if (config.has("password")) { configBuilder.put("password", config.get("password").asText()); } + if (!additionalParameters.isEmpty()) { + String connectionParams = String.join(";", additionalParameters); + configBuilder.put("connection_properties", connectionParams); + } + return Jsons.jsonNode(configBuilder.build()); } + private Protocol obtainConnectionProtocol(JsonNode encryption, + List additionalParameters) { + String encryptionMethod = encryption.get("encryption_method").asText(); + switch (encryptionMethod) { + case "unencrypted" -> { + return Protocol.TCP; + } + case "client_nne" -> { + String algorithm = encryption.get("encryption_algorithm").asText(); + additionalParameters.add("oracle.net.encryption_client=REQUIRED"); + additionalParameters.add("oracle.net.encryption_types_client=( " + algorithm + " )"); + return Protocol.TCP; + } + case "encrypted_verify_certificate" -> { + try { + convertAndImportCertificate(encryption.get("ssl_certificate").asText()); + } catch (IOException | InterruptedException e) { + throw new RuntimeException("Failed to import certificate into Java Keystore"); + } + additionalParameters.add("javax.net.ssl.trustStore=" + KEY_STORE_FILE_PATH); + additionalParameters.add("javax.net.ssl.trustStoreType=JKS"); + additionalParameters.add("javax.net.ssl.trustStorePassword=" + KEY_STORE_PASS); + return Protocol.TCPS; + } + } + throw new RuntimeException( + "Failed to obtain connection protocol from config " + encryption.asText()); + } + + private static void convertAndImportCertificate(String certificate) + throws IOException, InterruptedException { + Runtime run = Runtime.getRuntime(); + try (PrintWriter out = new PrintWriter("certificate.pem")) { + out.print(certificate); + } + runProcess("openssl x509 -outform der -in certificate.pem -out certificate.der", run); + runProcess("keytool -import -alias rds-root -keystore " + KEY_STORE_FILE_PATH + + " -file certificate.der -storepass " + KEY_STORE_PASS + " -noprompt", run); + } + + private static void runProcess(String cmd, Runtime run) throws IOException, InterruptedException { + Process pr = run.exec(cmd); + if (!pr.waitFor(30, TimeUnit.SECONDS)) { + pr.destroy(); + throw new RuntimeException("Timeout while executing: " + cmd); + } + } + public static void main(String[] args) throws Exception { - final Destination destination = new SshWrappedDestination(new OracleDestination(), HOST_KEY, PORT_KEY); + final Destination destination = new SshWrappedDestination(new OracleDestination(), HOST_KEY, + PORT_KEY); LOGGER.info("starting destination: {}", OracleDestination.class); new IntegrationRunner(destination).run(args); LOGGER.info("completed destination: {}", OracleDestination.class); diff --git a/airbyte-integrations/connectors/destination-oracle/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-oracle/src/main/resources/spec.json index de8eb7a089a3..000b1b299128 100644 --- a/airbyte-integrations/connectors/destination-oracle/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/destination-oracle/src/main/resources/spec.json @@ -50,9 +50,92 @@ "title": "Default Schema", "description": "The default schema tables are written to if the source does not specify a namespace. The usual value for this field is \"airbyte\". In Oracle, schemas and users are the same thing, so the \"user\" parameter is used as the login credentials and this is used for the default Airbyte message schema.", "type": "string", - "examples": ["airbyte"], + "examples": [ + "airbyte" + ], "default": "airbyte", "order": 5 + }, + "encryption": { + "title": "Encryption", + "type": "object", + "description": "Encryption method to use when communicating with the database", + "order": 6, + "oneOf": [ + { + "title": "Unencrypted", + "additionalProperties": false, + "description": "Data transfer will not be encrypted.", + "required": [ + "encryption_method" + ], + "properties": { + "encryption_method": { + "type": "string", + "const": "unencrypted", + "enum": [ + "unencrypted" + ], + "default": "unencrypted" + } + } + }, + { + "title": "Native Network Ecryption (NNE)", + "additionalProperties": false, + "description": "Native network encryption gives you the ability to encrypt database connections, without the configuration overhead of TCP/IP and SSL/TLS and without the need to open and listen on different ports.", + "required": [ + "encryption_method" + ], + "properties": { + "encryption_method": { + "type": "string", + "const": "client_nne", + "enum": [ + "client_nne" + ], + "default": "client_nne" + }, + "encryption_algorithm": { + "type": "string", + "description": "This parameter defines the encryption algorithm to be used", + "title": "Encryption Algorithm", + "default": "AES256", + "enum": [ + "AES256", + "RC4_56", + "3DES168" + ] + } + } + }, + { + "title": "TLS Encrypted (verify certificate)", + "additionalProperties": false, + "description": "Verify and use the cert provided by the server.", + "required": [ + "encryption_method", + "ssl_certificate" + ], + "properties": { + "encryption_method": { + "type": "string", + "const": "encrypted_verify_certificate", + "enum": [ + "encrypted_verify_certificate" + ], + "default": "encrypted_verify_certificate" + }, + "ssl_certificate": { + "title": "SSL PEM file", + "description": "Privacy Enhanced Mail (PEM) files are concatenated certificate containers frequently used in certificate installations", + "type": "string", + "airbyte_secret": true, + "multiline": true + } + } + } + ] } } } diff --git a/airbyte-integrations/connectors/destination-oracle/src/test-integration/java/io/airbyte/integrations/destination/oracle/NneOracleDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-oracle/src/test-integration/java/io/airbyte/integrations/destination/oracle/NneOracleDestinationAcceptanceTest.java new file mode 100644 index 000000000000..fa3a8840515a --- /dev/null +++ b/airbyte-integrations/connectors/destination-oracle/src/test-integration/java/io/airbyte/integrations/destination/oracle/NneOracleDestinationAcceptanceTest.java @@ -0,0 +1,78 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.oracle; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.ImmutableMap; +import io.airbyte.commons.json.Jsons; +import io.airbyte.db.Databases; +import io.airbyte.db.jdbc.JdbcDatabase; +import java.sql.SQLException; +import java.util.List; +import java.util.stream.Collectors; +import org.junit.Test; + +public class NneOracleDestinationAcceptanceTest extends UnencryptedOracleDestinationAcceptanceTest { + + @Test + public void testEncryption() throws SQLException { + String algorithm = "AES256"; + + final JsonNode config = getConfig(); + ((ObjectNode) config).put("encryption", Jsons.jsonNode(ImmutableMap.builder() + .put("encryption_method", "client_nne") + .put("encryption_algorithm", algorithm) + .build())); + + JdbcDatabase database = Databases.createJdbcDatabase(config.get("username").asText(), + config.get("password").asText(), + String.format("jdbc:oracle:thin:@//%s:%s/%s", + config.get("host").asText(), + config.get("port").asText(), + config.get("sid").asText()), + "oracle.jdbc.driver.OracleDriver", + "oracle.net.encryption_client=REQUIRED;" + + "oracle.net.encryption_types_client=( " + + algorithm + " )"); + + String network_service_banner = "select network_service_banner from v$session_connect_info where sid in (select distinct sid from v$mystat)"; + List collect = database.query(network_service_banner).collect(Collectors.toList()); + + assertThat(collect.get(2).get("NETWORK_SERVICE_BANNER").asText(), + equals("Oracle Advanced Security: " + algorithm + " encryption")); + } + + @Test + public void testCheckProtocol() throws SQLException { + final JsonNode clone = Jsons.clone(getConfig()); + ((ObjectNode) clone).put("encryption", Jsons.jsonNode(ImmutableMap.builder() + .put("encryption_method", "client_nne") + .put("encryption_algorithm", "AES256") + .build())); + + String algorithm = clone.get("encryption") + .get("encryption_algorithm").asText(); + + JdbcDatabase database = Databases.createJdbcDatabase(clone.get("username").asText(), + clone.get("password").asText(), + String.format("jdbc:oracle:thin:@//%s:%s/%s", + clone.get("host").asText(), + clone.get("port").asText(), + clone.get("sid").asText()), + "oracle.jdbc.driver.OracleDriver", + "oracle.net.encryption_client=REQUIRED;" + + "oracle.net.encryption_types_client=( " + + algorithm + " )"); + + String network_service_banner = "SELECT sys_context('USERENV', 'NETWORK_PROTOCOL') as network_protocol FROM dual"; + List collect = database.query(network_service_banner).collect(Collectors.toList()); + + assertEquals("tcp", collect.get(0).get("NETWORK_PROTOCOL").asText()); + } +} diff --git a/airbyte-integrations/connectors/destination-oracle/src/test-integration/java/io/airbyte/integrations/destination/oracle/OracleIntegrationTest.java b/airbyte-integrations/connectors/destination-oracle/src/test-integration/java/io/airbyte/integrations/destination/oracle/OracleIntegrationTest.java deleted file mode 100644 index d055c477a9c1..000000000000 --- a/airbyte-integrations/connectors/destination-oracle/src/test-integration/java/io/airbyte/integrations/destination/oracle/OracleIntegrationTest.java +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Copyright (c) 2021 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.oracle; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ObjectNode; -import io.airbyte.commons.io.IOs; -import io.airbyte.commons.json.Jsons; -import io.airbyte.db.Database; -import io.airbyte.db.Databases; -import io.airbyte.integrations.destination.ExtendedNameTransformer; -import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest; -import java.nio.file.Path; -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.List; -import java.util.stream.Collectors; -import org.jooq.JSONFormat; -import org.jooq.JSONFormat.RecordFormat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class OracleIntegrationTest extends DestinationAcceptanceTest { - - private static final Logger LOGGER = LoggerFactory.getLogger(OracleIntegrationTest.class); - private static final JSONFormat JSON_FORMAT = new JSONFormat().recordFormat(RecordFormat.OBJECT); - - private static JsonNode baseConfig; - private ExtendedNameTransformer namingResolver = new OracleNameTransformer(); - private static JsonNode config; - - public JsonNode getStaticConfig() { - return Jsons.deserialize(IOs.readFile(Path.of("secrets/config.json"))); - } - - @Override - protected String getImageName() { - return "airbyte/destination-oracle:dev"; - } - - @Override - protected JsonNode getConfig() { - return config; - } - - @Override - protected List retrieveRecords(TestDestinationEnv env, String streamName, String namespace, JsonNode streamSchema) throws Exception { - return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName), namespace) - .stream() - .map(r -> Jsons.deserialize(r.get(OracleDestination.COLUMN_NAME_DATA.replace("\"", "")).asText())) - .collect(Collectors.toList()); - } - - @Override - protected boolean implementsNamespaces() { - return true; - } - - @Override - protected boolean supportsDBT() { - return true; - } - - @Override - protected List retrieveNormalizedRecords(TestDestinationEnv env, String streamName, String namespace) - throws Exception { - String tableName = namingResolver.getIdentifier(streamName); - return retrieveRecordsFromTable(tableName, namespace); - } - - @Override - protected JsonNode getFailCheckConfig() { - final JsonNode invalidConfig = Jsons.clone(config); - ((ObjectNode) invalidConfig).put("password", "wrong password"); - return invalidConfig; - } - - @Override - protected List resolveIdentifier(String identifier) { - final List result = new ArrayList<>(); - final String resolved = namingResolver.getIdentifier(identifier); - result.add(identifier); - result.add(resolved); - if (!resolved.startsWith("\"")) { - result.add(resolved.toLowerCase()); - result.add(resolved.toUpperCase()); - } - return result; - } - - private List retrieveRecordsFromTable(String tableName, String schemaName) throws SQLException { - List result = getDatabase() - .query(ctx -> ctx.fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC", schemaName, tableName, OracleDestination.COLUMN_NAME_EMITTED_AT)) - .stream() - .collect(Collectors.toList())); - return result - .stream() - .map(r -> r.formatJSON(JSON_FORMAT)) - .map(Jsons::deserialize) - .collect(Collectors.toList()); - } - - private static Database getDatabase() { - // todo (cgardens) - rework this abstraction so that we do not have to pass a null into the - // constructor. at least explicitly handle it, even if the impl doesn't change. - return Databases.createDatabase( - baseConfig.get("username").asText(), - baseConfig.get("password").asText(), - String.format("jdbc:oracle:thin:@//%s:%s/%s", - baseConfig.get("host").asText(), - baseConfig.get("port").asText(), - baseConfig.get("sid").asText()), - "oracle.jdbc.driver.OracleDriver", - null); - } - - private List allTables; - - private List getAllTables(Database db) { - try { - return db.query(ctx -> ctx.fetch("select OWNER, TABLE_NAME from ALL_TABLES where upper(TABLESPACE_NAME) = 'USERS'") - .stream() - .map(r -> String.format("%s.%s", r.get("OWNER"), r.get("TABLE_NAME"))) - .collect(Collectors.toList())); - } catch (SQLException e) { - LOGGER.error("Error while cleaning up test.", e); - return null; - } - } - - @Override - protected void setup(TestDestinationEnv testEnv) throws SQLException { - // config = getConfig(db); - baseConfig = getStaticConfig(); - config = Jsons.clone(baseConfig); - final Database database = getDatabase(); - allTables = getAllTables(database); - } - - @Override - protected void tearDown(TestDestinationEnv testEnv) { - config = getStaticConfig(); - - final Database database = getDatabase(); - var tables = getAllTables(database); - try { - for (String table : tables) { - database.query(ctx -> { - ctx.execute("drop table " + table); - return null; - }); - } - } catch (SQLException e) { - LOGGER.error("Error while cleaning up test.", e); - } - } - -} diff --git a/airbyte-integrations/connectors/destination-oracle/src/test-integration/java/io/airbyte/integrations/destination/oracle/SshOracleDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-oracle/src/test-integration/java/io/airbyte/integrations/destination/oracle/SshOracleDestinationAcceptanceTest.java index a011162ddc9e..5c736128af6e 100644 --- a/airbyte-integrations/connectors/destination-oracle/src/test-integration/java/io/airbyte/integrations/destination/oracle/SshOracleDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-oracle/src/test-integration/java/io/airbyte/integrations/destination/oracle/SshOracleDestinationAcceptanceTest.java @@ -45,10 +45,11 @@ protected String getImageName() { @Override protected JsonNode getConfig() throws IOException, InterruptedException { - return sshBastionContainer.getTunnelConfig(getTunnelMethod(), getBasicOracleDbConfigBuider(db).put("schema", schemaName)); + return sshBastionContainer.getTunnelConfig(getTunnelMethod(), + getBasicOracleDbConfigBuilder(db).put("schema", schemaName)); } - public ImmutableMap.Builder getBasicOracleDbConfigBuider(OracleContainer db) { + public ImmutableMap.Builder getBasicOracleDbConfigBuilder(OracleContainer db) { return ImmutableMap.builder() .put("host", Objects.requireNonNull(db.getContainerInfo().getNetworkSettings() .getNetworks() @@ -59,7 +60,9 @@ public ImmutableMap.Builder getBasicOracleDbConfigBuider(OracleC .put("port", db.getExposedPorts().get(0)) .put("sid", db.getSid()) .put("schemas", List.of("JDBC_SPACE")) - .put("ssl", false); + .put("encryption", Jsons.jsonNode(ImmutableMap.builder() + .put("encryption_method", "unencrypted") + .build())); } @Override diff --git a/airbyte-integrations/connectors/destination-oracle/src/test-integration/java/io/airbyte/integrations/destination/oracle/UnencryptedOracleDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-oracle/src/test-integration/java/io/airbyte/integrations/destination/oracle/UnencryptedOracleDestinationAcceptanceTest.java new file mode 100644 index 000000000000..2464824e3acf --- /dev/null +++ b/airbyte-integrations/connectors/destination-oracle/src/test-integration/java/io/airbyte/integrations/destination/oracle/UnencryptedOracleDestinationAcceptanceTest.java @@ -0,0 +1,182 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.oracle; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.ImmutableMap; +import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.string.Strings; +import io.airbyte.db.Database; +import io.airbyte.db.Databases; +import io.airbyte.db.jdbc.JdbcDatabase; +import io.airbyte.integrations.destination.ExtendedNameTransformer; +import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import org.jooq.JSONFormat; +import org.jooq.JSONFormat.RecordFormat; +import org.junit.Test; + +public class UnencryptedOracleDestinationAcceptanceTest extends DestinationAcceptanceTest { + + private static final JSONFormat JSON_FORMAT = new JSONFormat().recordFormat(RecordFormat.OBJECT); + + private ExtendedNameTransformer namingResolver = new OracleNameTransformer(); + private static OracleContainer db; + private static JsonNode config; + private final String schemaName = "TEST_ORCL"; + + @Override + protected String getImageName() { + return "airbyte/destination-oracle:dev"; + } + + private JsonNode getConfig(OracleContainer db) { + + return Jsons.jsonNode(ImmutableMap.builder() + .put("host", db.getHost()) + .put("port", db.getFirstMappedPort()) + .put("sid", db.getSid()) + .put("username", db.getUsername()) + .put("password", db.getPassword()) + .put("schemas", List.of("JDBC_SPACE")) + .put("encryption", Jsons.jsonNode(ImmutableMap.builder() + .put("encryption_method", "unencrypted") + .build())) + .build()); + } + + @Override + protected JsonNode getConfig() { + return Jsons.clone(config); + } + + @Override + protected List retrieveRecords(TestDestinationEnv env, String streamName, + String namespace, JsonNode streamSchema) throws Exception { + return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName), namespace) + .stream() + .map(r -> Jsons.deserialize( + r.get(OracleDestination.COLUMN_NAME_DATA.replace("\"", "")).asText())) + .collect(Collectors.toList()); + } + + @Override + protected boolean implementsNamespaces() { + return true; + } + + @Override + protected boolean supportsDBT() { + return true; + } + + @Override + protected List retrieveNormalizedRecords(TestDestinationEnv env, String streamName, + String namespace) + throws Exception { + String tableName = namingResolver.getIdentifier(streamName); + return retrieveRecordsFromTable(tableName, namespace); + } + + @Override + protected JsonNode getFailCheckConfig() { + final JsonNode invalidConfig = getConfig(); + ((ObjectNode) invalidConfig).put("password", "wrong password"); + return invalidConfig; + } + + @Override + protected List resolveIdentifier(String identifier) { + final List result = new ArrayList<>(); + final String resolved = namingResolver.getIdentifier(identifier); + result.add(identifier); + result.add(resolved); + if (!resolved.startsWith("\"")) { + result.add(resolved.toLowerCase()); + result.add(resolved.toUpperCase()); + } + return result; + } + + private List retrieveRecordsFromTable(String tableName, String schemaName) + throws SQLException { + List result = getDatabase(config) + .query(ctx -> ctx.fetch( + String.format("SELECT * FROM %s.%s ORDER BY %s ASC", schemaName, tableName, + OracleDestination.COLUMN_NAME_EMITTED_AT)) + .stream() + .collect(Collectors.toList())); + return result + .stream() + .map(r -> r.formatJSON(JSON_FORMAT)) + .map(Jsons::deserialize) + .collect(Collectors.toList()); + } + + private static Database getDatabase(JsonNode config) { + return Databases.createDatabase( + config.get("username").asText(), + config.get("password").asText(), + String.format("jdbc:oracle:thin:@//%s:%s/%s", + config.get("host").asText(), + config.get("port").asText(), + config.get("sid").asText()), + "oracle.jdbc.driver.OracleDriver", + null); + } + + @Override + protected void setup(TestDestinationEnv testEnv) throws Exception { + final String dbName = Strings.addRandomSuffix("db", "_", 10); + db = new OracleContainer() + .withUsername("test") + .withPassword("oracle") + .usingSid(); + db.start(); + + config = getConfig(db); + + final Database database = getDatabase(config); + database.query( + ctx -> ctx.fetch(String.format("CREATE USER %s IDENTIFIED BY %s", schemaName, schemaName))); + database.query(ctx -> ctx.fetch(String.format("GRANT ALL PRIVILEGES TO %s", schemaName))); + + database.close(); + + ((ObjectNode) config).put("schema", dbName); + } + + @Override + protected void tearDown(TestDestinationEnv testEnv) { + db.stop(); + db.close(); + } + + @Test + public void testNoneEncryption() throws SQLException { + JsonNode config = getConfig(); + + JdbcDatabase database = Databases.createJdbcDatabase(config.get("username").asText(), + config.get("password").asText(), + String.format("jdbc:oracle:thin:@//%s:%s/%s", + config.get("host").asText(), + config.get("port").asText(), + config.get("sid").asText()), + "oracle.jdbc.driver.OracleDriver"); + + String network_service_banner = "select network_service_banner from v$session_connect_info where sid in (select distinct sid from v$mystat)"; + List collect = database.query(network_service_banner).collect(Collectors.toList()); + + assertTrue(collect.get(1).get("NETWORK_SERVICE_BANNER").asText() + .contains("Oracle Advanced Security: encryption")); + } + +} diff --git a/airbyte-integrations/connectors/source-oracle/src/main/java/io/airbyte/integrations/source/oracle/OracleSource.java b/airbyte-integrations/connectors/source-oracle/src/main/java/io/airbyte/integrations/source/oracle/OracleSource.java index 77d70a127d35..cc53920b14df 100644 --- a/airbyte-integrations/connectors/source-oracle/src/main/java/io/airbyte/integrations/source/oracle/OracleSource.java +++ b/airbyte-integrations/connectors/source-oracle/src/main/java/io/airbyte/integrations/source/oracle/OracleSource.java @@ -109,7 +109,8 @@ private Protocol obtainConnectionProtocol(JsonNode encryption, List addi return Protocol.TCPS; } } - throw new RuntimeException("Failed to obtain connection protocol from config" + encryption.asText()); + throw new RuntimeException( + "Failed to obtain connection protocol from config " + encryption.asText()); } private static void convertAndImportCertificate(String certificate) throws IOException, InterruptedException { diff --git a/docs/integrations/destinations/oracle.md b/docs/integrations/destinations/oracle.md index 5d7039c969e9..705caf02d631 100644 --- a/docs/integrations/destinations/oracle.md +++ b/docs/integrations/destinations/oracle.md @@ -10,6 +10,7 @@ | Namespaces | Yes | | | Basic Normalization | Yes | Doesn't support for nested json yet | | SSH Tunnel Connection | Yes | | +| Encryption | Yes | Support Native Network Encryption (NNE) as well as TLS using SSL cert | ## Output Schema @@ -69,19 +70,50 @@ When using an SSH tunnel, you are configuring Airbyte to connect to an intermedi Using this feature requires additional configuration, when creating the source. We will talk through what each piece of configuration means. 1. Configure all fields for the source as you normally would, except `SSH Tunnel Method`. -2. `SSH Tunnel Method` defaults to `No Tunnel` \(meaning a direct connection\). If you want to use an SSH Tunnel choose `SSH Key Authentication` or `Password Authentication`. - 1. Choose `Key Authentication` if you will be using an RSA private key as your secret for establishing the SSH Tunnel \(see below for more information on generating this key\). - 2. Choose `Password Authentication` if you will be using a password as your secret for establishing the SSH Tunnel. -3. `SSH Tunnel Jump Server Host` refers to the intermediate \(bastion\) server that Airbyte will connect to. This should be a hostname or an IP Address. -4. `SSH Connection Port` is the port on the bastion server with which to make the SSH connection. The default port for SSH connections is `22`, so unless you have explicitly changed something, go with the default. -5. `SSH Login Username` is the username that Airbyte should use when connection to the bastion server. This is NOT the Oracle username. -6. If you are using `Password Authentication`, then `SSH Login Username` should be set to the password of the User from the previous step. If you are using `SSH Key Authentication` leave this blank. Again, this is not the Oracle password, but the password for the OS-user that Airbyte is using to perform commands on the bastion. -7. If you are using `SSH Key Authentication`, then `SSH Private Key` should be set to the RSA Private Key that you are using to create the SSH connection. This should be the full contents of the key file starting with `-----BEGIN RSA PRIVATE KEY-----` and ending with `-----END RSA PRIVATE KEY-----`. +2. `SSH Tunnel Method` defaults to `No Tunnel` \(meaning a direct connection\). If you want to use + an SSH Tunnel choose `SSH Key Authentication` or `Password Authentication`. + 1. Choose `Key Authentication` if you will be using an RSA private key as your secret for + establishing the SSH Tunnel \(see below for more information on generating this key\). + 2. Choose `Password Authentication` if you will be using a password as your secret for + establishing the SSH Tunnel. +3. `SSH Tunnel Jump Server Host` refers to the intermediate \(bastion\) server that Airbyte will + connect to. This should be a hostname or an IP Address. +4. `SSH Connection Port` is the port on the bastion server with which to make the SSH connection. + The default port for SSH connections is `22`, so unless you have explicitly changed something, go + with the default. +5. `SSH Login Username` is the username that Airbyte should use when connection to the bastion + server. This is NOT the Oracle username. +6. If you are using `Password Authentication`, then `SSH Login Username` should be set to the + password of the User from the previous step. If you are using `SSH Key Authentication` leave this + blank. Again, this is not the Oracle password, but the password for the OS-user that Airbyte is + using to perform commands on the bastion. +7. If you are using `SSH Key Authentication`, then `SSH Private Key` should be set to the RSA + Private Key that you are using to create the SSH connection. This should be the full contents of + the key file starting with `-----BEGIN RSA PRIVATE KEY-----` and ending + with `-----END RSA PRIVATE KEY-----`. + +## Encryption Options + +Airbite has the ability to connect to the Oracle source with 3 network connectivity options: + +1. `Unencrypted` the connection will be made using the TCP protocol. In this case, all data over the + network will be transmitted in unencrypted form. +2. `Native network encryption` gives you the ability to encrypt database connections, without the + configuration overhead of TCP / IP and SSL / TLS and without the need to open and listen on + different ports. In this case, the *SQLNET.ENCRYPTION_CLIENT* + option will always be set as *REQUIRED* by default: The client or server will only accept + encrypted traffic, but the user has the opportunity to choose an `Encryption algorithm` according + to the security policies he needs. +3. `TLS Encrypted` (verify certificate) - if this option is selected, data transfer will be + transfered using the TLS protocol, taking into account the handshake procedure and certificate + verification. To use this option, insert the content of the certificate issued by the server into + the `SSL PEM file` field ## Changelog | Version | Date | Pull Request | Subject | | :--- | :--- | :--- | :--- | +| 0.1.10 | 2021-10-08 | [\#6893](https://github.com/airbytehq/airbyte/pull/6893)| 🎉 Destination Oracle: implemented connection encryption | | 0.1.9 | 2021-10-06 | [\#6611](https://github.com/airbytehq/airbyte/pull/6611) | 🐛 Destination Oracle: maxStringLength should be 128 | | 0.1.8 | 2021-09-28 | [\#6370](https://github.com/airbytehq/airbyte/pull/6370) | Add SSH Support for Oracle Destination | | 0.1.7 | 2021-08-30 | [\#5746](https://github.com/airbytehq/airbyte/pull/5746) | Use default column name for raw tables |