Skip to content

Commit

Permalink
🎉Destination MySQl - added ssl strict encrypt connector (airbytehq#6763)
Browse files Browse the repository at this point in the history
* [6423] Destination MySQl - added ssl strict encrypt connector
  • Loading branch information
etsybaev authored and schlattk committed Jan 4, 2022
1 parent d52620c commit d4b5b99
Show file tree
Hide file tree
Showing 12 changed files with 544 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
*
!Dockerfile
!build
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
FROM airbyte/integration-base-java:dev

WORKDIR /airbyte

ENV APPLICATION destination-mysql-strict-encrypt

COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.0
LABEL io.airbyte.name=airbyte/destination-mysql-strict-encrypt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# MySql Strict Encrypt Test Configuration

In order to test the MySql destination, you need to have the up and running MySql database that has SSL enabled.

This connector inherits the MySql destination, but support SSL connections only.
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
plugins {
id 'application'
id 'airbyte-docker'
id 'airbyte-integration-test-java'
}

application {
mainClass = 'io.airbyte.integrations.destination.mysql.MySQLDestinationStrictEncrypt'
applicationDefaultJvmArgs = ['-XX:MaxRAMPercentage=75.0']
}

dependencies {
implementation project(':airbyte-db:lib')
implementation project(':airbyte-integrations:bases:base-java')
implementation project(':airbyte-protocol:models')
implementation project(':airbyte-integrations:connectors:destination-jdbc')
implementation project(':airbyte-integrations:connectors:destination-mysql')

implementation 'mysql:mysql-connector-java:8.0.22'

integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test')
integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-mysql')
integrationTestJavaImplementation "org.testcontainers:mysql:1.15.1"

implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)
integrationTestJavaImplementation files(project(':airbyte-integrations:bases:base-normalization').airbyteDocker.outputs)
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.mysql;

import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.spec_modification.SpecModifyingDestination;
import io.airbyte.protocol.models.ConnectorSpecification;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MySQLDestinationStrictEncrypt extends SpecModifyingDestination implements Destination {

private static final Logger LOGGER = LoggerFactory.getLogger(MySQLDestinationStrictEncrypt.class);

public MySQLDestinationStrictEncrypt() {
super(MySQLDestination.sshWrappedDestination());
}

@Override
public ConnectorSpecification modifySpec(ConnectorSpecification originalSpec) {
final ConnectorSpecification spec = Jsons.clone(originalSpec);
((ObjectNode) spec.getConnectionSpecification().get("properties")).remove("ssl");
return spec;
}

public static void main(String[] args) throws Exception {
final Destination destination = new MySQLDestinationStrictEncrypt();
LOGGER.info("starting destination: {}", MySQLDestinationStrictEncrypt.class);
new IntegrationRunner(destination).run(args);
LOGGER.info("completed destination: {}", MySQLDestinationStrictEncrypt.class);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.mysql;

import static org.junit.jupiter.api.Assertions.assertEquals;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Databases;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStateMessage;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.sql.SQLException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.jooq.JSONFormat;
import org.jooq.JSONFormat.RecordFormat;
import org.jooq.SQLDialect;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.MySQLContainer;

public class MySQLStrictEncryptDestinationAcceptanceTest extends DestinationAcceptanceTest {

private static final JSONFormat JSON_FORMAT = new JSONFormat().recordFormat(RecordFormat.OBJECT);

private MySQLContainer<?> db;
private final ExtendedNameTransformer namingResolver = new MySQLNameTransformer();

@Override
protected String getImageName() {
return "airbyte/destination-mysql-strict-encrypt:dev";
}

@Override
protected boolean supportsDBT() {
return true;
}

@Override
protected boolean implementsNamespaces() {
return true;
}

@Override
protected boolean supportsNormalization() {
return true;
}

@Override
protected JsonNode getConfig() {
return Jsons.jsonNode(ImmutableMap.builder()
.put("host", db.getHost())
.put("username", db.getUsername())
.put("password", db.getPassword())
.put("database", db.getDatabaseName())
.put("port", db.getFirstMappedPort())
.build());
}

@Override
protected JsonNode getFailCheckConfig() {
return Jsons.jsonNode(ImmutableMap.builder()
.put("host", db.getHost())
.put("username", db.getUsername())
.put("password", "wrong password")
.put("database", db.getDatabaseName())
.put("port", db.getFirstMappedPort())
.build());
}

@Override
protected String getDefaultSchema(JsonNode config) {
if (config.get("database") == null) {
return null;
}
return config.get("database").asText();
}

@Override
protected List<JsonNode> retrieveRecords(TestDestinationEnv testEnv,
String streamName,
String namespace,
JsonNode streamSchema)
throws Exception {
return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName), namespace)
.stream()
.map(r -> Jsons.deserialize(r.get(JavaBaseConstants.COLUMN_NAME_DATA).asText()))
.collect(Collectors.toList());
}

private List<JsonNode> retrieveRecordsFromTable(String tableName, String schemaName) throws SQLException {
return Databases.createDatabase(
db.getUsername(),
db.getPassword(),
String.format("jdbc:mysql://%s:%s/%s?useSSL=true&requireSSL=true&verifyServerCertificate=false",
db.getHost(),
db.getFirstMappedPort(),
db.getDatabaseName()),
"com.mysql.cj.jdbc.Driver",
SQLDialect.MYSQL).query(
ctx -> ctx
.fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName,
JavaBaseConstants.COLUMN_NAME_EMITTED_AT))
.stream()
.map(r -> r.formatJSON(JSON_FORMAT))
.map(Jsons::deserialize)
.collect(Collectors.toList()));
}

@Override
protected List<JsonNode> retrieveNormalizedRecords(TestDestinationEnv testEnv, String streamName, String namespace) throws Exception {
String tableName = namingResolver.getIdentifier(streamName);
String schema = namingResolver.getIdentifier(namespace);
return retrieveRecordsFromTable(tableName, schema);
}

@Override
protected List<String> resolveIdentifier(String identifier) {
final List<String> result = new ArrayList<>();
final String resolved = namingResolver.getIdentifier(identifier);
result.add(identifier);
result.add(resolved);
if (!resolved.startsWith("\"")) {
result.add(resolved.toLowerCase());
}
return result;
}

@Override
protected void setup(TestDestinationEnv testEnv) {
db = new MySQLContainer<>("mysql:8.0");
db.start();
setLocalInFileToTrue();
revokeAllPermissions();
grantCorrectPermissions();
}

private void setLocalInFileToTrue() {
executeQuery("set global local_infile=true");
}

private void revokeAllPermissions() {
executeQuery("REVOKE ALL PRIVILEGES, GRANT OPTION FROM " + db.getUsername() + "@'%';");
}

private void grantCorrectPermissions() {
executeQuery("GRANT ALTER, CREATE, INSERT, SELECT, DROP ON *.* TO " + db.getUsername() + "@'%';");
}

private void executeQuery(String query) {
try {
Databases.createDatabase(
"root",
"test",
String.format("jdbc:mysql://%s:%s/%s?useSSL=true&requireSSL=true&verifyServerCertificate=false",
db.getHost(),
db.getFirstMappedPort(),
db.getDatabaseName()),
"com.mysql.cj.jdbc.Driver",
SQLDialect.MYSQL).query(
ctx -> ctx
.execute(query));
} catch (SQLException e) {
throw new RuntimeException(e);
}
}

@Override
protected void tearDown(TestDestinationEnv testEnv) {
db.stop();
db.close();
}

@Override
@Test
public void testCustomDbtTransformations() {
// We need to create view for testing custom dbt transformations
executeQuery("GRANT CREATE VIEW ON *.* TO " + db.getUsername() + "@'%';");
}

@Test
public void testJsonSync() throws Exception {
final String catalogAsText = "{\n"
+ " \"streams\": [\n"
+ " {\n"
+ " \"name\": \"exchange_rate\",\n"
+ " \"json_schema\": {\n"
+ " \"properties\": {\n"
+ " \"id\": {\n"
+ " \"type\": \"integer\"\n"
+ " },\n"
+ " \"data\": {\n"
+ " \"type\": \"string\"\n"
+ " }"
+ " }\n"
+ " }\n"
+ " }\n"
+ " ]\n"
+ "}\n";

final AirbyteCatalog catalog = Jsons.deserialize(catalogAsText, AirbyteCatalog.class);
final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(catalog);
final List<AirbyteMessage> messages = Lists.newArrayList(
new AirbyteMessage()
.withType(Type.RECORD)
.withRecord(new AirbyteRecordMessage()
.withStream(catalog.getStreams().get(0).getName())
.withEmittedAt(Instant.now().toEpochMilli())
.withData(Jsons.jsonNode(ImmutableMap.builder()
.put("id", 1)
.put("data", "{\"name\":\"Conferência Faturamento - Custo - Taxas - Margem - Resumo ano inicial até -2\",\"description\":null}")
.build()))),
new AirbyteMessage()
.withType(Type.STATE)
.withState(new AirbyteStateMessage().withData(Jsons.jsonNode(ImmutableMap.of("checkpoint", 2)))));

final JsonNode config = getConfig();
final String defaultSchema = getDefaultSchema(config);
runSyncAndVerifyStateOutput(config, messages, configuredCatalog, false);
retrieveRawRecordsAndAssertSameMessages(catalog, messages, defaultSchema);
}

@Override
@Test
public void testLineBreakCharacters() {
// overrides test with a no-op until we handle full UTF-8 in the destination
}

protected void assertSameValue(JsonNode expectedValue, JsonNode actualValue) {
if (expectedValue.isBoolean()) {
// Boolean in MySQL are stored as TINYINT (0 or 1) so we force them to boolean values here
assertEquals(expectedValue.asBoolean(), actualValue.asBoolean());
} else {
assertEquals(expectedValue, actualValue);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.mysql;

import static org.junit.jupiter.api.Assertions.assertEquals;

import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.protocol.models.ConnectorSpecification;
import org.junit.jupiter.api.Test;

class MySqlDestinationStrictEncryptTest {

@Test
void testGetSpec() throws Exception {
System.out.println(new MySQLDestinationStrictEncrypt().spec().getConnectionSpecification());
assertEquals(Jsons.deserialize(MoreResources.readResource("expected_spec.json"), ConnectorSpecification.class),
new MySQLDestinationStrictEncrypt().spec());
}

}
Loading

0 comments on commit d4b5b99

Please sign in to comment.