diff --git a/airbyte-integrations/connectors/destination-databricks/.dockerignore b/airbyte-integrations/connectors/destination-databricks/.dockerignore new file mode 100644 index 000000000000..65c7d0ad3e73 --- /dev/null +++ b/airbyte-integrations/connectors/destination-databricks/.dockerignore @@ -0,0 +1,3 @@ +* +!Dockerfile +!build diff --git a/airbyte-integrations/connectors/destination-databricks/Dockerfile b/airbyte-integrations/connectors/destination-databricks/Dockerfile new file mode 100644 index 000000000000..4c3d7fc644ca --- /dev/null +++ b/airbyte-integrations/connectors/destination-databricks/Dockerfile @@ -0,0 +1,11 @@ +FROM airbyte/integration-base-java:dev + +WORKDIR /airbyte +ENV APPLICATION destination-databricks + +COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar + +RUN tar xf ${APPLICATION}.tar --strip-components=1 + +LABEL io.airbyte.version=0.1.0 +LABEL io.airbyte.name=airbyte/destination-databricks diff --git a/airbyte-integrations/connectors/destination-databricks/README.md b/airbyte-integrations/connectors/destination-databricks/README.md new file mode 100644 index 000000000000..70c175071357 --- /dev/null +++ b/airbyte-integrations/connectors/destination-databricks/README.md @@ -0,0 +1,68 @@ +# Destination Databricks + +This is the repository for the Databricks destination connector in Java. +For information about how to use this connector within Airbyte, see [the User Documentation](https://docs.airbyte.io/integrations/destinations/databricks). + +## Local development + +#### Building via Gradle +From the Airbyte repository root, run: +``` +./gradlew :airbyte-integrations:connectors:destination-databricks:build +``` + +#### Create credentials +**If you are a community contributor**, generate the necessary credentials and place them in `secrets/config.json` conforming to the spec file in `src/main/resources/spec.json`. +Note that the `secrets` directory is git-ignored by default, so there is no danger of accidentally checking in sensitive information. + +**If you are an Airbyte core member**, follow the [instructions](https://docs.airbyte.io/connector-development#using-credentials-in-ci) to set up the credentials. + +### Locally running the connector docker image + +#### Build +Build the connector image via Gradle: +``` +./gradlew :airbyte-integrations:connectors:destination-databricks:airbyteDocker +``` +When building via Gradle, the docker image name and tag, respectively, are the values of the `io.airbyte.name` and `io.airbyte.version` `LABEL`s in +the Dockerfile. + +#### Run +Then run any of the connector commands as follows: +``` +docker run --rm airbyte/destination-databricks:dev spec +docker run --rm -v $(pwd)/secrets:/secrets airbyte/destination-databricks:dev check --config /secrets/config.json +docker run --rm -v $(pwd)/secrets:/secrets airbyte/destination-databricks:dev discover --config /secrets/config.json +docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/destination-databricks:dev read --config /secrets/config.json --catalog /integration_tests/configured_catalog.json +``` + +## Testing +We use `JUnit` for Java tests. + +### Unit and Integration Tests +Place unit tests under `src/test/io/airbyte/integrations/destinations/databricks`. + +#### Acceptance Tests +Airbyte has a standard test suite that all destination connectors must pass. Implement the `TODO`s in +`src/test-integration/java/io/airbyte/integrations/destinations/databricksDestinationAcceptanceTest.java`. + +### Using gradle to run tests +All commands should be run from airbyte project root. +To run unit tests: +``` +./gradlew :airbyte-integrations:connectors:destination-databricks:unitTest +``` +To run acceptance and custom integration tests: +``` +./gradlew :airbyte-integrations:connectors:destination-databricks:integrationTest +``` + +## Dependency Management + +### Publishing a new version of the connector +You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what? +1. Make sure your changes are passing unit and integration tests. +1. Bump the connector version in `Dockerfile` -- just increment the value of the `LABEL io.airbyte.version` appropriately (we use [SemVer](https://semver.org/)). +1. Create a Pull Request. +1. Pat yourself on the back for being an awesome contributor. +1. Someone from Airbyte will take a look at your PR and iterate with you to merge it into master. diff --git a/airbyte-integrations/connectors/destination-databricks/build.gradle b/airbyte-integrations/connectors/destination-databricks/build.gradle new file mode 100644 index 000000000000..f260d1903cc8 --- /dev/null +++ b/airbyte-integrations/connectors/destination-databricks/build.gradle @@ -0,0 +1,29 @@ +plugins { + id 'application' + id 'airbyte-docker' + id 'airbyte-integration-test-java' +} + +application { + mainClass = 'io.airbyte.integrations.destination.databricks.DatabricksDestination' +} + +dependencies { + implementation project(':airbyte-db:lib') + implementation project(':airbyte-config:models') + implementation project(':airbyte-protocol:models') + implementation project(':airbyte-integrations:bases:base-java') + implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs) + implementation project(':airbyte-integrations:connectors:destination-jdbc') + implementation project(':airbyte-integrations:connectors:destination-s3') + + // parquet + implementation group: 'org.apache.hadoop', name: 'hadoop-common', version: '3.3.0' + implementation group: 'org.apache.hadoop', name: 'hadoop-aws', version: '3.3.0' + implementation group: 'org.apache.hadoop', name: 'hadoop-mapreduce-client-core', version: '3.3.0' + implementation group: 'org.apache.parquet', name: 'parquet-avro', version: '1.12.0' + implementation group: 'tech.allegro.schema.json2avro', name: 'converter', version: '0.2.10' + + integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test') + integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-databricks') +} diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksDestination.java b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksDestination.java new file mode 100644 index 000000000000..f90fc2353271 --- /dev/null +++ b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksDestination.java @@ -0,0 +1,91 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.databricks; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.db.Databases; +import io.airbyte.db.jdbc.JdbcDatabase; +import io.airbyte.integrations.base.AirbyteMessageConsumer; +import io.airbyte.integrations.base.IntegrationRunner; +import io.airbyte.integrations.destination.ExtendedNameTransformer; +import io.airbyte.integrations.destination.jdbc.SqlOperations; +import io.airbyte.integrations.destination.jdbc.copy.CopyConsumerFactory; +import io.airbyte.integrations.destination.jdbc.copy.CopyDestination; +import io.airbyte.integrations.destination.jdbc.copy.s3.S3Config; +import io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopier; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import java.util.function.Consumer; + +public class DatabricksDestination extends CopyDestination { + + private static final String DRIVER_CLASS = "com.simba.spark.jdbc.Driver"; + + public static void main(String[] args) throws Exception { + new IntegrationRunner(new DatabricksDestination()).run(args); + } + + @Override + public AirbyteMessageConsumer getConsumer(JsonNode config, ConfiguredAirbyteCatalog catalog, Consumer outputRecordCollector) { + return CopyConsumerFactory.create( + outputRecordCollector, + getDatabase(config), + getSqlOperations(), + getNameTransformer(), + S3Config.getS3Config(config), + catalog, + new DatabricksStreamCopierFactory(), + config.get("schema").asText().equals("") ? "default" : config.get("schema").asText() + ); + } + + @Override + public void checkPersistence(JsonNode config) { + S3StreamCopier.attemptS3WriteAndDelete(S3Config.getS3Config(config)); + } + + @Override + public ExtendedNameTransformer getNameTransformer() { + return new DatabricksNameTransformer(); + } + + @Override + public JdbcDatabase getDatabase(JsonNode databricksConfig) { + return Databases.createJdbcDatabase( + "token", + databricksConfig.get("pat").asText(), + String.format("jdbc:spark://%s:443/default;transportMode=http;ssl=1;httpPath=%s", + databricksConfig.get("serverHostname").asText(), + databricksConfig.get("httpPath").asText()), + DRIVER_CLASS + ); + } + + @Override + public SqlOperations getSqlOperations() { + return new DatabricksSqlOperations(); + } + +} diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksNameTransformer.java b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksNameTransformer.java new file mode 100644 index 000000000000..141b2b4e29c9 --- /dev/null +++ b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksNameTransformer.java @@ -0,0 +1,86 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.databricks; + +import io.airbyte.integrations.destination.ExtendedNameTransformer; + +/** + * TODO: Replace below MySQL docstring with Databricks equiv. + * + * Note that MySQL documentation discusses about identifiers case sensitivity using the + * lower_case_table_names system variable. As one of their recommendation is: "It is best to adopt a + * consistent convention, such as always creating and referring to databases and tables using + * lowercase names. This convention is recommended for maximum portability and ease of use. + * + * Source: https://dev.mysql.com/doc/refman/8.0/en/identifier-case-sensitivity.html" + * + * As a result, we are here forcing all identifier (table, schema and columns) names to lowercase. + */ +public class DatabricksNameTransformer extends ExtendedNameTransformer { + + // These constants must match those in destination_name_transformer.py + public static final int MAX_MYSQL_NAME_LENGTH = 64; + // DBT appends a suffix to table names + public static final int TRUNCATE_DBT_RESERVED_SIZE = 12; + // 4 charachters for 1 underscore and 3 suffix (e.g. _ab1) + // 4 charachters for 1 underscore and 3 schema hash + public static final int TRUNCATE_RESERVED_SIZE = 8; + public static final int TRUNCATION_MAX_NAME_LENGTH = MAX_MYSQL_NAME_LENGTH - TRUNCATE_DBT_RESERVED_SIZE - TRUNCATE_RESERVED_SIZE; + + @Override + public String getIdentifier(String name) { + String identifier = applyDefaultCase(super.getIdentifier(name)); + return truncateName(identifier, TRUNCATION_MAX_NAME_LENGTH); + } + + @Override + public String getTmpTableName(String streamName) { + String tmpTableName = applyDefaultCase(super.getTmpTableName(streamName)); + return truncateName(tmpTableName, TRUNCATION_MAX_NAME_LENGTH); + } + + @Override + public String getRawTableName(String streamName) { + String rawTableName = applyDefaultCase(super.getRawTableName(streamName)); + return truncateName(rawTableName, TRUNCATION_MAX_NAME_LENGTH); + } + + static String truncateName(String name, int maxLength) { + if (name.length() <= maxLength) { + return name; + } + + int allowedLength = maxLength - 2; + String prefix = name.substring(0, allowedLength / 2); + String suffix = name.substring(name.length() - allowedLength / 2); + return prefix + "__" + suffix; + } + + @Override + protected String applyDefaultCase(String input) { + return input.toLowerCase(); + } + +} diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksSqlOperations.java b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksSqlOperations.java new file mode 100644 index 000000000000..31ba481c8f67 --- /dev/null +++ b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksSqlOperations.java @@ -0,0 +1,178 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.databricks; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.db.jdbc.JdbcDatabase; +import io.airbyte.integrations.base.JavaBaseConstants; +import io.airbyte.integrations.destination.StandardNameTransformer; +import io.airbyte.integrations.destination.jdbc.JdbcSqlOperations; +import io.airbyte.protocol.models.AirbyteRecordMessage; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.List; +import java.util.stream.Collectors; + +public class DatabricksSqlOperations extends JdbcSqlOperations { + + private boolean isLocalFileEnabled = false; + + @Override + public void executeTransaction(JdbcDatabase database, List queries) throws Exception { + database.executeWithinTransaction(queries); + } + + @Override + public void insertRecordsInternal(JdbcDatabase database, + List records, + String schemaName, + String tmpTableName) + throws SQLException { + if (records.isEmpty()) { + return; + } + + verifyLocalFileEnabled(database); + try { + File tmpFile = Files.createTempFile(tmpTableName + "-", ".tmp").toFile(); + + loadDataIntoTable(database, records, schemaName, tmpTableName, tmpFile); + + Files.delete(tmpFile.toPath()); + } catch (IOException e) { + throw new SQLException(e); + } + } + + private void loadDataIntoTable(JdbcDatabase database, + List records, + String schemaName, + String tmpTableName, + File tmpFile) + throws SQLException { + database.execute(connection -> { + try { + writeBatchToFile(tmpFile, records); + + String absoluteFile = "'" + tmpFile.getAbsolutePath() + "'"; + + String query = String.format( + "LOAD DATA LOCAL INFILE %s INTO TABLE %s.%s FIELDS TERMINATED BY ',' ENCLOSED BY '\"' ESCAPED BY '\\\"' LINES TERMINATED BY '\\r\\n'", + absoluteFile, schemaName, tmpTableName); + + try (Statement stmt = connection.createStatement()) { + stmt.execute(query); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + @Override + protected JsonNode formatData(JsonNode data) { + return StandardNameTransformer.formatJsonPath(data); + } + + void verifyLocalFileEnabled(JdbcDatabase database) throws SQLException { + boolean localFileEnabled = isLocalFileEnabled || checkIfLocalFileIsEnabled(database); + if (!localFileEnabled) { + tryEnableLocalFile(database); + } + isLocalFileEnabled = true; + } + + private void tryEnableLocalFile(JdbcDatabase database) throws SQLException { + database.execute(connection -> { + try (Statement statement = connection.createStatement()) { + statement.execute("set global local_infile=true"); + } catch (Exception e) { + throw new RuntimeException( + "The DB user provided to airbyte was unable to switch on the local_infile attribute on the MySQL server. As an admin user, you will need to run \"SET GLOBAL local_infile = true\" before syncing data with Airbyte.", + e); + } + }); + } + + private double getVersion(JdbcDatabase database) throws SQLException { + List value = database.resultSetQuery(connection -> connection.createStatement().executeQuery("select version()"), + resultSet -> resultSet.getString("version()")).collect(Collectors.toList()); + return Double.parseDouble(value.get(0).substring(0, 3)); + } + + VersionCompatibility isCompatibleVersion(JdbcDatabase database) throws SQLException { + double version = getVersion(database); + return new VersionCompatibility(version, version >= 5.7); + } + + @Override + public boolean isSchemaRequired() { + return false; + } + + private boolean checkIfLocalFileIsEnabled(JdbcDatabase database) throws SQLException { + List value = database.resultSetQuery(connection -> connection.createStatement().executeQuery("SHOW GLOBAL VARIABLES LIKE 'local_infile'"), + resultSet -> resultSet.getString("Value")).collect(Collectors.toList()); + + return value.get(0).equalsIgnoreCase("on"); + } + + @Override + public String createTableQuery(JdbcDatabase database, String schemaName, String tableName) { + // MySQL requires byte information with VARCHAR. Since we are using uuid as value for the column, + // 256 is enough + return String.format( + "CREATE TABLE IF NOT EXISTS %s.%s ( \n" + + "%s VARCHAR(256) PRIMARY KEY,\n" + + "%s JSON,\n" + + "%s TIMESTAMP(6) DEFAULT CURRENT_TIMESTAMP(6)\n" + + ");\n", + schemaName, tableName, JavaBaseConstants.COLUMN_NAME_AB_ID, JavaBaseConstants.COLUMN_NAME_DATA, JavaBaseConstants.COLUMN_NAME_EMITTED_AT); + } + + public static class VersionCompatibility { + + private final double version; + private final boolean isCompatible; + + public VersionCompatibility(double version, boolean isCompatible) { + this.version = version; + this.isCompatible = isCompatible; + } + + public double getVersion() { + return version; + } + + public boolean isCompatible() { + return isCompatible; + } + + } + +} diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopier.java b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopier.java new file mode 100644 index 000000000000..3ae154c4bc1d --- /dev/null +++ b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopier.java @@ -0,0 +1,142 @@ +package io.airbyte.integrations.destination.databricks; + +import com.amazonaws.services.s3.AmazonS3; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.airbyte.db.jdbc.JdbcDatabase; +import io.airbyte.integrations.destination.ExtendedNameTransformer; +import io.airbyte.integrations.destination.jdbc.SqlOperations; +import io.airbyte.integrations.destination.jdbc.copy.StreamCopier; +import io.airbyte.integrations.destination.jdbc.copy.s3.S3Config; +import io.airbyte.integrations.destination.s3.S3DestinationConfig; +import io.airbyte.integrations.destination.s3.parquet.S3ParquetFormatConfig; +import io.airbyte.integrations.destination.s3.parquet.S3ParquetWriter; +import io.airbyte.integrations.destination.s3.writer.S3WriterFactory; +import io.airbyte.protocol.models.AirbyteRecordMessage; +import io.airbyte.protocol.models.AirbyteStream; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; +import io.airbyte.protocol.models.DestinationSyncMode; +import java.sql.Timestamp; +import java.util.UUID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This implementation is similar to {@link io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopier}. + * The difference is that this implementation creates Parquet staging files, instead of CSV ones. + */ +public class DatabricksStreamCopier implements StreamCopier { + + private static final Logger LOGGER = LoggerFactory.getLogger(DatabricksStreamCopier.class); + private static final ObjectMapper MAPPER = new ObjectMapper(); + + private final AmazonS3 s3Client; + private final S3Config s3Config; + private final String tmpTableName; + private final DestinationSyncMode syncMode; + private final AirbyteStream stream; + private final JdbcDatabase db; + private final String database; + private final String streamName; + private final ExtendedNameTransformer nameTransformer; + private final DatabricksSqlOperations sqlOperations; + private final S3ParquetWriter parquetWriter; + + public DatabricksStreamCopier(String stagingFolder, + DestinationSyncMode syncMode, + String schema, + ConfiguredAirbyteStream configuredStream, + String streamName, + AmazonS3 s3Client, + JdbcDatabase db, + S3Config s3Config, + ExtendedNameTransformer nameTransformer, + SqlOperations sqlOperations, + S3WriterFactory writerFactory, + Timestamp uploadTime) throws Exception { + this.stream = configuredStream.getStream(); + this.syncMode = syncMode; + this.db = db; + this.database = schema; + this.streamName = streamName; + this.nameTransformer = nameTransformer; + this.sqlOperations = (DatabricksSqlOperations) sqlOperations; + this.tmpTableName = nameTransformer.getTmpTableName(streamName); + this.s3Client = s3Client; + this.s3Config = s3Config; + this.parquetWriter = (S3ParquetWriter) writerFactory + .create(getS3DestinationConfig(s3Config, stagingFolder), s3Client, configuredStream, uploadTime); + LOGGER.info(parquetWriter.parquetSchema.toString()); + } + + @Override + public void write(UUID id, AirbyteRecordMessage recordMessage) throws Exception { + parquetWriter.write(id, recordMessage); + } + + @Override + public void closeStagingUploader(boolean hasFailed) throws Exception { + parquetWriter.close(hasFailed); + } + + @Override + public void createDestinationSchema() throws Exception { + LOGGER.info("Creating database in destination if it doesn't exist: {}", database); + sqlOperations.createSchemaIfNotExists(db, database); + } + + @Override + public void createTemporaryTable() throws Exception { + LOGGER.info("Preparing tmp table in destination for stream: {}, database: {}, tmp table name: {}.", streamName, database, tmpTableName); + LOGGER.info(parquetWriter.parquetSchema.toString()); + sqlOperations.createTableIfNotExists(db, database, tmpTableName); + } + + @Override + public void copyStagingFileToTemporaryTable() throws Exception { + LOGGER.info("Starting copy to tmp table: {} in destination for stream: {}, database: {}, .", tmpTableName, streamName, database); + // TODO: load data sql operation + LOGGER.info("Copy to tmp table {} in destination for stream {} complete.", tmpTableName, streamName); + } + + + @Override + public String createDestinationTable() throws Exception { + var destTableName = nameTransformer.getRawTableName(streamName); + LOGGER.info("Preparing table {} in destination.", destTableName); + sqlOperations.createTableIfNotExists(db, database, destTableName); + LOGGER.info("Table {} in destination prepared.", tmpTableName); + + return destTableName; + } + + @Override + public String generateMergeStatement(String destTableName) { + LOGGER.info("Preparing to merge tmp table {} to dest table: {}, database: {}, in destination.", tmpTableName, destTableName, database); + var queries = new StringBuilder(); + if (syncMode.equals(DestinationSyncMode.OVERWRITE)) { + queries.append(sqlOperations.truncateTableQuery(db, database, destTableName)); + LOGGER.info("Destination OVERWRITE mode detected. Dest table: {}, database: {}, truncated.", destTableName, database); + } + queries.append(sqlOperations.copyTableQuery(db, database, tmpTableName, destTableName)); + return queries.toString(); + } + + @Override + public void removeFileAndDropTmpTable() throws Exception { + + } + + private S3DestinationConfig getS3DestinationConfig(S3Config s3Config, String stagingFolder) { + return new S3DestinationConfig( + s3Config.getEndpoint(), + s3Config.getBucketName(), + stagingFolder, + s3Config.getRegion(), + s3Config.getAccessKeyId(), + s3Config.getSecretAccessKey(), + // use default parquet format config + new S3ParquetFormatConfig(MAPPER.createObjectNode()) + ); + } + +} diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopierFactory.java b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopierFactory.java new file mode 100644 index 000000000000..096e134bd223 --- /dev/null +++ b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopierFactory.java @@ -0,0 +1,45 @@ +package io.airbyte.integrations.destination.databricks; + +import com.amazonaws.services.s3.AmazonS3; +import io.airbyte.db.jdbc.JdbcDatabase; +import io.airbyte.integrations.destination.ExtendedNameTransformer; +import io.airbyte.integrations.destination.jdbc.SqlOperations; +import io.airbyte.integrations.destination.jdbc.copy.StreamCopier; +import io.airbyte.integrations.destination.jdbc.copy.StreamCopierFactory; +import io.airbyte.integrations.destination.jdbc.copy.s3.S3Config; +import io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopier; +import io.airbyte.integrations.destination.s3.parquet.S3ParquetWriter; +import io.airbyte.integrations.destination.s3.writer.ProductionWriterFactory; +import io.airbyte.integrations.destination.s3.writer.S3WriterFactory; +import io.airbyte.protocol.models.AirbyteStream; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; +import io.airbyte.protocol.models.DestinationSyncMode; +import java.sql.Timestamp; + +public class DatabricksStreamCopierFactory implements StreamCopierFactory { + + @Override + public StreamCopier create(String configuredSchema, + S3Config s3Config, + String stagingFolder, + ConfiguredAirbyteStream configuredStream, + ExtendedNameTransformer nameTransformer, + JdbcDatabase db, + SqlOperations sqlOperations) { + try { + AirbyteStream stream = configuredStream.getStream(); + DestinationSyncMode syncMode = configuredStream.getDestinationSyncMode(); + String schema = StreamCopierFactory.getSchema(stream, configuredSchema, nameTransformer); + AmazonS3 s3Client = S3StreamCopier.getAmazonS3(s3Config); + S3WriterFactory writerFactory = new ProductionWriterFactory(); + Timestamp uploadTimestamp = new Timestamp(System.currentTimeMillis()); + + return new DatabricksStreamCopier( + stagingFolder, syncMode, schema, configuredStream, stream.getName(), s3Client, db, s3Config, nameTransformer, sqlOperations, writerFactory, uploadTimestamp); + } catch (Exception e) { + throw new RuntimeException(e); + } + + } + +} diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-databricks/src/main/resources/spec.json new file mode 100644 index 000000000000..f9a10ca18b85 --- /dev/null +++ b/airbyte-integrations/connectors/destination-databricks/src/main/resources/spec.json @@ -0,0 +1,102 @@ +{ + "documentationUrl": "https://docs.airbyte.io/integrations/destinations/databricks", + "supportsIncremental": false, + "supportsNormalization": false, + "supportsDBT": false, + "supported_destination_sync_modes": ["overwrite"], + "connectionSpecification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Databricks Destination Spec", + "type": "object", + "required": [ + "serverHostname", + "httpPath", + "pat" + ], + "additionalProperties": false, + "properties": { + "serverHostname": { + "title": "Server Hostname", + "type": "string", + "description": "", + "examples": [""] + }, + "httpPath": { + "title": "HTTP Path", + "type": "string", + "description": "", + "examples": [""] + }, + "pat": { + "title": "Personal Access Token", + "type": "string", + "description": "", + "examples": [""], + "airbyte_secret": true + }, + "schema": { + "title": "Database", + "type": "string", + "description": "" + }, + "s3_bucket_name": { + "title": "S3 Bucket Name", + "type": "string", + "description": "The name of the S3 bucket to use for intermittent staging of the data.", + "examples": ["airbyte.staging"] + }, + "s3_bucket_region": { + "title": "S3 Bucket Region", + "type": "string", + "default": "", + "description": "The region of the S3 staging bucket to use if utilising a copy strategy.", + "enum": [ + "", + "us-east-1", + "us-east-2", + "us-west-1", + "us-west-2", + "af-south-1", + "ap-east-1", + "ap-south-1", + "ap-northeast-1", + "ap-northeast-2", + "ap-northeast-3", + "ap-southeast-1", + "ap-southeast-2", + "ca-central-1", + "cn-north-1", + "cn-northwest-1", + "eu-central-1", + "eu-north-1", + "eu-south-1", + "eu-west-1", + "eu-west-2", + "eu-west-3", + "sa-east-1", + "me-south-1" + ] + }, + "access_key_id": { + "type": "string", + "description": "The Access Key Id granting allow one to access the above S3 staging bucket. Airbyte requires Read and Write permissions to the given bucket.", + "title": "S3 Key Id", + "airbyte_secret": true + }, + "secret_access_key": { + "type": "string", + "description": "The corresponding secret to the above access key id.", + "title": "S3 Access Key", + "airbyte_secret": true + }, + "part_size": { + "type": "integer", + "minimum": 10, + "maximum": 100, + "examples": ["10"], + "description": "Optional. Increase this if syncing tables larger than 100GB. Files are streamed to S3 in parts. This determines the size of each part, in MBs. As S3 has a limit of 10,000 parts per file, part size affects the table size. This is 10MB by default, resulting in a default limit of 100GB tables. Note, a larger part size will result in larger memory requirements. A rule of thumb is to multiply the part size by 10 to get the memory requirement. Modify this with care.", + "title": "Stream Part Size" + } + } + } +} diff --git a/airbyte-integrations/connectors/destination-databricks/src/test-integration/java/io/airbyte/integrations/destination/databricks/DatabricksDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-databricks/src/test-integration/java/io/airbyte/integrations/destination/databricks/DatabricksDestinationAcceptanceTest.java new file mode 100644 index 000000000000..84aec1d1c0a5 --- /dev/null +++ b/airbyte-integrations/connectors/destination-databricks/src/test-integration/java/io/airbyte/integrations/destination/databricks/DatabricksDestinationAcceptanceTest.java @@ -0,0 +1,82 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.databricks; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest; +import java.io.IOException; +import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DatabricksDestinationAcceptanceTest extends DestinationAcceptanceTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(DatabricksDestinationAcceptanceTest.class); + + private JsonNode configJson; + + @Override + protected String getImageName() { + return "airbyte/destination-databricks:dev"; + } + + @Override + protected JsonNode getConfig() { + // TODO: Generate the configuration JSON file to be used for running the destination during the test + // configJson can either be static and read from secrets/config.json directly + // or created in the setup method + return configJson; + } + + @Override + protected JsonNode getFailCheckConfig() { + // TODO return an invalid config which, when used to run the connector's check connection operation, + // should result in a failed connection check + return null; + } + + @Override + protected List retrieveRecords(TestDestinationEnv testEnv, + String streamName, + String namespace, + JsonNode streamSchema) + throws IOException { + // TODO Implement this method to retrieve records which written to the destination by the connector. + // Records returned from this method will be compared against records provided to the connector + // to verify they were written correctly + return null; + } + + @Override + protected void setup(TestDestinationEnv testEnv) { + // TODO Implement this method to run any setup actions needed before every test case + } + + @Override + protected void tearDown(TestDestinationEnv testEnv) { + // TODO Implement this method to run any cleanup actions needed after every test case + } + +} diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/CopyConsumerFactory.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/CopyConsumerFactory.java index 9dcb075575bb..a11d341b9144 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/CopyConsumerFactory.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/CopyConsumerFactory.java @@ -24,7 +24,6 @@ package io.airbyte.integrations.destination.jdbc.copy; -import io.airbyte.commons.json.Jsons; import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.integrations.base.AirbyteMessageConsumer; import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; @@ -37,8 +36,6 @@ import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteRecordMessage; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; -import java.sql.Timestamp; -import java.time.Instant; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -94,8 +91,7 @@ private static Map createWrite for (var configuredStream : catalog.getStreams()) { var stream = configuredStream.getStream(); var pair = AirbyteStreamNameNamespacePair.fromAirbyteSteam(stream); - var syncMode = configuredStream.getDestinationSyncMode(); - var copier = streamCopierFactory.create(defaultSchema, config, stagingFolder, syncMode, stream, namingResolver, database, sqlOperations); + var copier = streamCopierFactory.create(defaultSchema, config, stagingFolder, configuredStream, namingResolver, database, sqlOperations); pairToCopier.put(pair, copier); } @@ -116,8 +112,7 @@ private static RecordWriter recordWriterFunction(Map { @@ -35,10 +36,17 @@ public interface StreamCopierFactory { StreamCopier create(String configuredSchema, T config, String stagingFolder, - DestinationSyncMode syncMode, - AirbyteStream stream, + ConfiguredAirbyteStream configuredStream, ExtendedNameTransformer nameTransformer, JdbcDatabase db, SqlOperations sqlOperations); + static String getSchema(AirbyteStream stream, String configuredSchema, ExtendedNameTransformer nameTransformer) { + if (stream.getNamespace() != null) { + return nameTransformer.convertStreamName(stream.getNamespace()); + } else { + return nameTransformer.convertStreamName(configuredSchema); + } + } + } diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsStreamCopier.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsStreamCopier.java index c74ee13e8389..69fe6dddff78 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsStreamCopier.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsStreamCopier.java @@ -30,10 +30,12 @@ import com.google.cloud.storage.BlobInfo; import com.google.cloud.storage.Storage; import com.google.cloud.storage.StorageOptions; +import io.airbyte.commons.json.Jsons; import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.integrations.destination.ExtendedNameTransformer; import io.airbyte.integrations.destination.jdbc.SqlOperations; import io.airbyte.integrations.destination.jdbc.copy.StreamCopier; +import io.airbyte.protocol.models.AirbyteRecordMessage; import io.airbyte.protocol.models.DestinationSyncMode; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -44,6 +46,7 @@ import java.nio.charset.StandardCharsets; import java.sql.SQLException; import java.sql.Timestamp; +import java.time.Instant; import java.util.UUID; import org.apache.commons.csv.CSVFormat; import org.apache.commons.csv.CSVPrinter; @@ -103,8 +106,10 @@ public GcsStreamCopier(String stagingFolder, } @Override - public void write(UUID id, String jsonDataString, Timestamp emittedAt) throws Exception { - csvPrinter.printRecord(id, jsonDataString, emittedAt); + public void write(UUID id, AirbyteRecordMessage recordMessage) throws Exception { + csvPrinter.printRecord(id, + Jsons.serialize(recordMessage.getData()), + Timestamp.from(Instant.ofEpochMilli(recordMessage.getEmittedAt()))); } @Override diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsStreamCopierFactory.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsStreamCopierFactory.java index 594eaf85285f..eb2d44af8235 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsStreamCopierFactory.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsStreamCopierFactory.java @@ -28,12 +28,12 @@ import com.google.cloud.storage.Storage; import com.google.cloud.storage.StorageOptions; import io.airbyte.db.jdbc.JdbcDatabase; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.destination.ExtendedNameTransformer; import io.airbyte.integrations.destination.jdbc.SqlOperations; import io.airbyte.integrations.destination.jdbc.copy.StreamCopier; import io.airbyte.integrations.destination.jdbc.copy.StreamCopierFactory; import io.airbyte.protocol.models.AirbyteStream; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; import io.airbyte.protocol.models.DestinationSyncMode; import java.io.ByteArrayInputStream; import java.io.InputStream; @@ -47,14 +47,14 @@ public abstract class GcsStreamCopierFactory implements StreamCopierFactory { @@ -43,17 +43,17 @@ public abstract class S3StreamCopierFactory implements StreamCopierFactory parquetWriter; private final AvroRecordFactory avroRecordFactory; + public final Schema parquetSchema; public S3ParquetWriter(S3DestinationConfig config, AmazonS3 s3Client, @@ -88,6 +89,7 @@ public S3ParquetWriter(S3DestinationConfig config, .withDictionaryEncoding(formatConfig.isDictionaryEncoding()) .build(); this.avroRecordFactory = new AvroRecordFactory(schema, nameUpdater); + this.parquetSchema = schema; } public static Configuration getHadoopConfig(S3DestinationConfig config) { diff --git a/docs/integrations/destinations/databricks.md b/docs/integrations/destinations/databricks.md new file mode 100644 index 000000000000..d2570854d288 --- /dev/null +++ b/docs/integrations/destinations/databricks.md @@ -0,0 +1,52 @@ +# Databricks + +TODO: update this doc + +## Sync overview + +### Output schema + +Is the output schema fixed (e.g: for an API like Stripe)? If so, point to the connector's schema (e.g: link to Stripe’s documentation) or describe the schema here directly (e.g: include a diagram or paragraphs describing the schema). + +Describe how the connector's schema is mapped to Airbyte concepts. An example description might be: "MagicDB tables become Airbyte Streams and MagicDB columns become Airbyte Fields. In addition, an extracted\_at column is appended to each row being read." + +### Data type mapping + +This section should contain a table mapping each of the connector's data types to Airbyte types. At the moment, Airbyte uses the same types used by [JSONSchema](https://json-schema.org/understanding-json-schema/reference/index.html). `string`, `date-time`, `object`, `array`, `boolean`, `integer`, and `number` are the most commonly used data types. + +| Integration Type | Airbyte Type | Notes | +| :--- | :--- | :--- | + + +### Features + +This section should contain a table with the following format: + +| Feature | Supported?(Yes/No) | Notes | +| :--- | :--- | :--- | +| Full Refresh Sync | | | +| Incremental Sync | | | +| Replicate Incremental Deletes | | | +| For databases, WAL/Logical replication | | | +| SSL connection | | | +| SSH Tunnel Support | | | +| (Any other source-specific features) | | | + +### Performance considerations + +Could this connector hurt the user's database/API/etc... or put too much strain on it in certain circumstances? For example, if there are a lot of tables or rows in a table? What is the breaking point (e.g: 100mm> records)? What can the user do to prevent this? (e.g: use a read-only replica, or schedule frequent syncs, etc..) + +## Getting started + +### Requirements + +* What versions of this connector does this implementation support? (e.g: `postgres v3.14 and above`) +* What configurations, if any, are required on the connector? (e.g: `buffer_size > 1024`) +* Network accessibility requirements +* Credentials/authentication requirements? (e.g: A DB user with read permissions on certain tables) + +### Setup guide + +For each of the above high-level requirements as appropriate, add or point to a follow-along guide. See existing source or destination guides for an example. + +For each major cloud provider we support, also add a follow-along guide for setting up Airbyte to connect to that destination. See the Postgres destination guide for an example of what this should look like.