From 679ddf4580b9f7dc8aac3e0492b78cc4a1b821fa Mon Sep 17 00:00:00 2001 From: LiRen Tu Date: Tue, 14 Sep 2021 16:54:05 -0700 Subject: [PATCH] Revert "Add skeleton for databricks destination (#5629)" (#6066) This reverts commit 79256c46b541ffcdd88a8589dde75954aea7d838. --- .../destination-databricks/.dockerignore | 3 - .../destination-databricks/Dockerfile | 11 -- .../destination-databricks/README.md | 68 ------- .../destination-databricks/build.gradle | 29 --- .../databricks/DatabricksDestination.java | 91 --------- .../databricks/DatabricksNameTransformer.java | 86 --------- .../databricks/DatabricksSqlOperations.java | 178 ------------------ .../databricks/DatabricksStreamCopier.java | 142 -------------- .../DatabricksStreamCopierFactory.java | 45 ----- .../src/main/resources/spec.json | 102 ---------- .../DatabricksDestinationAcceptanceTest.java | 82 -------- .../jdbc/copy/CopyConsumerFactory.java | 9 +- .../destination/jdbc/copy/StreamCopier.java | 3 +- .../jdbc/copy/StreamCopierFactory.java | 12 +- .../jdbc/copy/gcs/GcsStreamCopier.java | 9 +- .../jdbc/copy/gcs/GcsStreamCopierFactory.java | 20 +- .../jdbc/copy/s3/S3StreamCopier.java | 11 +- .../jdbc/copy/s3/S3StreamCopierFactory.java | 22 ++- .../s3/parquet/S3ParquetWriter.java | 2 - docs/integrations/destinations/databricks.md | 52 ----- 20 files changed, 44 insertions(+), 933 deletions(-) delete mode 100644 airbyte-integrations/connectors/destination-databricks/.dockerignore delete mode 100644 airbyte-integrations/connectors/destination-databricks/Dockerfile delete mode 100644 airbyte-integrations/connectors/destination-databricks/README.md delete mode 100644 airbyte-integrations/connectors/destination-databricks/build.gradle delete mode 100644 airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksDestination.java delete mode 100644 airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksNameTransformer.java delete mode 100644 airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksSqlOperations.java delete mode 100644 airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopier.java delete mode 100644 airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopierFactory.java delete mode 100644 airbyte-integrations/connectors/destination-databricks/src/main/resources/spec.json delete mode 100644 airbyte-integrations/connectors/destination-databricks/src/test-integration/java/io/airbyte/integrations/destination/databricks/DatabricksDestinationAcceptanceTest.java delete mode 100644 docs/integrations/destinations/databricks.md diff --git a/airbyte-integrations/connectors/destination-databricks/.dockerignore b/airbyte-integrations/connectors/destination-databricks/.dockerignore deleted file mode 100644 index 65c7d0ad3e73..000000000000 --- a/airbyte-integrations/connectors/destination-databricks/.dockerignore +++ /dev/null @@ -1,3 +0,0 @@ -* -!Dockerfile -!build diff --git a/airbyte-integrations/connectors/destination-databricks/Dockerfile b/airbyte-integrations/connectors/destination-databricks/Dockerfile deleted file mode 100644 index 4c3d7fc644ca..000000000000 --- a/airbyte-integrations/connectors/destination-databricks/Dockerfile +++ /dev/null @@ -1,11 +0,0 @@ -FROM airbyte/integration-base-java:dev - -WORKDIR /airbyte -ENV APPLICATION destination-databricks - -COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar - -RUN tar xf ${APPLICATION}.tar --strip-components=1 - -LABEL io.airbyte.version=0.1.0 -LABEL io.airbyte.name=airbyte/destination-databricks diff --git a/airbyte-integrations/connectors/destination-databricks/README.md b/airbyte-integrations/connectors/destination-databricks/README.md deleted file mode 100644 index 70c175071357..000000000000 --- a/airbyte-integrations/connectors/destination-databricks/README.md +++ /dev/null @@ -1,68 +0,0 @@ -# Destination Databricks - -This is the repository for the Databricks destination connector in Java. -For information about how to use this connector within Airbyte, see [the User Documentation](https://docs.airbyte.io/integrations/destinations/databricks). - -## Local development - -#### Building via Gradle -From the Airbyte repository root, run: -``` -./gradlew :airbyte-integrations:connectors:destination-databricks:build -``` - -#### Create credentials -**If you are a community contributor**, generate the necessary credentials and place them in `secrets/config.json` conforming to the spec file in `src/main/resources/spec.json`. -Note that the `secrets` directory is git-ignored by default, so there is no danger of accidentally checking in sensitive information. - -**If you are an Airbyte core member**, follow the [instructions](https://docs.airbyte.io/connector-development#using-credentials-in-ci) to set up the credentials. - -### Locally running the connector docker image - -#### Build -Build the connector image via Gradle: -``` -./gradlew :airbyte-integrations:connectors:destination-databricks:airbyteDocker -``` -When building via Gradle, the docker image name and tag, respectively, are the values of the `io.airbyte.name` and `io.airbyte.version` `LABEL`s in -the Dockerfile. - -#### Run -Then run any of the connector commands as follows: -``` -docker run --rm airbyte/destination-databricks:dev spec -docker run --rm -v $(pwd)/secrets:/secrets airbyte/destination-databricks:dev check --config /secrets/config.json -docker run --rm -v $(pwd)/secrets:/secrets airbyte/destination-databricks:dev discover --config /secrets/config.json -docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/destination-databricks:dev read --config /secrets/config.json --catalog /integration_tests/configured_catalog.json -``` - -## Testing -We use `JUnit` for Java tests. - -### Unit and Integration Tests -Place unit tests under `src/test/io/airbyte/integrations/destinations/databricks`. - -#### Acceptance Tests -Airbyte has a standard test suite that all destination connectors must pass. Implement the `TODO`s in -`src/test-integration/java/io/airbyte/integrations/destinations/databricksDestinationAcceptanceTest.java`. - -### Using gradle to run tests -All commands should be run from airbyte project root. -To run unit tests: -``` -./gradlew :airbyte-integrations:connectors:destination-databricks:unitTest -``` -To run acceptance and custom integration tests: -``` -./gradlew :airbyte-integrations:connectors:destination-databricks:integrationTest -``` - -## Dependency Management - -### Publishing a new version of the connector -You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what? -1. Make sure your changes are passing unit and integration tests. -1. Bump the connector version in `Dockerfile` -- just increment the value of the `LABEL io.airbyte.version` appropriately (we use [SemVer](https://semver.org/)). -1. Create a Pull Request. -1. Pat yourself on the back for being an awesome contributor. -1. Someone from Airbyte will take a look at your PR and iterate with you to merge it into master. diff --git a/airbyte-integrations/connectors/destination-databricks/build.gradle b/airbyte-integrations/connectors/destination-databricks/build.gradle deleted file mode 100644 index f260d1903cc8..000000000000 --- a/airbyte-integrations/connectors/destination-databricks/build.gradle +++ /dev/null @@ -1,29 +0,0 @@ -plugins { - id 'application' - id 'airbyte-docker' - id 'airbyte-integration-test-java' -} - -application { - mainClass = 'io.airbyte.integrations.destination.databricks.DatabricksDestination' -} - -dependencies { - implementation project(':airbyte-db:lib') - implementation project(':airbyte-config:models') - implementation project(':airbyte-protocol:models') - implementation project(':airbyte-integrations:bases:base-java') - implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs) - implementation project(':airbyte-integrations:connectors:destination-jdbc') - implementation project(':airbyte-integrations:connectors:destination-s3') - - // parquet - implementation group: 'org.apache.hadoop', name: 'hadoop-common', version: '3.3.0' - implementation group: 'org.apache.hadoop', name: 'hadoop-aws', version: '3.3.0' - implementation group: 'org.apache.hadoop', name: 'hadoop-mapreduce-client-core', version: '3.3.0' - implementation group: 'org.apache.parquet', name: 'parquet-avro', version: '1.12.0' - implementation group: 'tech.allegro.schema.json2avro', name: 'converter', version: '0.2.10' - - integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test') - integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-databricks') -} diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksDestination.java b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksDestination.java deleted file mode 100644 index f90fc2353271..000000000000 --- a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksDestination.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 Airbyte - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package io.airbyte.integrations.destination.databricks; - -import com.fasterxml.jackson.databind.JsonNode; -import io.airbyte.db.Databases; -import io.airbyte.db.jdbc.JdbcDatabase; -import io.airbyte.integrations.base.AirbyteMessageConsumer; -import io.airbyte.integrations.base.IntegrationRunner; -import io.airbyte.integrations.destination.ExtendedNameTransformer; -import io.airbyte.integrations.destination.jdbc.SqlOperations; -import io.airbyte.integrations.destination.jdbc.copy.CopyConsumerFactory; -import io.airbyte.integrations.destination.jdbc.copy.CopyDestination; -import io.airbyte.integrations.destination.jdbc.copy.s3.S3Config; -import io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopier; -import io.airbyte.protocol.models.AirbyteMessage; -import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; -import java.util.function.Consumer; - -public class DatabricksDestination extends CopyDestination { - - private static final String DRIVER_CLASS = "com.simba.spark.jdbc.Driver"; - - public static void main(String[] args) throws Exception { - new IntegrationRunner(new DatabricksDestination()).run(args); - } - - @Override - public AirbyteMessageConsumer getConsumer(JsonNode config, ConfiguredAirbyteCatalog catalog, Consumer outputRecordCollector) { - return CopyConsumerFactory.create( - outputRecordCollector, - getDatabase(config), - getSqlOperations(), - getNameTransformer(), - S3Config.getS3Config(config), - catalog, - new DatabricksStreamCopierFactory(), - config.get("schema").asText().equals("") ? "default" : config.get("schema").asText() - ); - } - - @Override - public void checkPersistence(JsonNode config) { - S3StreamCopier.attemptS3WriteAndDelete(S3Config.getS3Config(config)); - } - - @Override - public ExtendedNameTransformer getNameTransformer() { - return new DatabricksNameTransformer(); - } - - @Override - public JdbcDatabase getDatabase(JsonNode databricksConfig) { - return Databases.createJdbcDatabase( - "token", - databricksConfig.get("pat").asText(), - String.format("jdbc:spark://%s:443/default;transportMode=http;ssl=1;httpPath=%s", - databricksConfig.get("serverHostname").asText(), - databricksConfig.get("httpPath").asText()), - DRIVER_CLASS - ); - } - - @Override - public SqlOperations getSqlOperations() { - return new DatabricksSqlOperations(); - } - -} diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksNameTransformer.java b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksNameTransformer.java deleted file mode 100644 index 141b2b4e29c9..000000000000 --- a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksNameTransformer.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 Airbyte - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package io.airbyte.integrations.destination.databricks; - -import io.airbyte.integrations.destination.ExtendedNameTransformer; - -/** - * TODO: Replace below MySQL docstring with Databricks equiv. - * - * Note that MySQL documentation discusses about identifiers case sensitivity using the - * lower_case_table_names system variable. As one of their recommendation is: "It is best to adopt a - * consistent convention, such as always creating and referring to databases and tables using - * lowercase names. This convention is recommended for maximum portability and ease of use. - * - * Source: https://dev.mysql.com/doc/refman/8.0/en/identifier-case-sensitivity.html" - * - * As a result, we are here forcing all identifier (table, schema and columns) names to lowercase. - */ -public class DatabricksNameTransformer extends ExtendedNameTransformer { - - // These constants must match those in destination_name_transformer.py - public static final int MAX_MYSQL_NAME_LENGTH = 64; - // DBT appends a suffix to table names - public static final int TRUNCATE_DBT_RESERVED_SIZE = 12; - // 4 charachters for 1 underscore and 3 suffix (e.g. _ab1) - // 4 charachters for 1 underscore and 3 schema hash - public static final int TRUNCATE_RESERVED_SIZE = 8; - public static final int TRUNCATION_MAX_NAME_LENGTH = MAX_MYSQL_NAME_LENGTH - TRUNCATE_DBT_RESERVED_SIZE - TRUNCATE_RESERVED_SIZE; - - @Override - public String getIdentifier(String name) { - String identifier = applyDefaultCase(super.getIdentifier(name)); - return truncateName(identifier, TRUNCATION_MAX_NAME_LENGTH); - } - - @Override - public String getTmpTableName(String streamName) { - String tmpTableName = applyDefaultCase(super.getTmpTableName(streamName)); - return truncateName(tmpTableName, TRUNCATION_MAX_NAME_LENGTH); - } - - @Override - public String getRawTableName(String streamName) { - String rawTableName = applyDefaultCase(super.getRawTableName(streamName)); - return truncateName(rawTableName, TRUNCATION_MAX_NAME_LENGTH); - } - - static String truncateName(String name, int maxLength) { - if (name.length() <= maxLength) { - return name; - } - - int allowedLength = maxLength - 2; - String prefix = name.substring(0, allowedLength / 2); - String suffix = name.substring(name.length() - allowedLength / 2); - return prefix + "__" + suffix; - } - - @Override - protected String applyDefaultCase(String input) { - return input.toLowerCase(); - } - -} diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksSqlOperations.java b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksSqlOperations.java deleted file mode 100644 index 31ba481c8f67..000000000000 --- a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksSqlOperations.java +++ /dev/null @@ -1,178 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 Airbyte - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package io.airbyte.integrations.destination.databricks; - -import com.fasterxml.jackson.databind.JsonNode; -import io.airbyte.db.jdbc.JdbcDatabase; -import io.airbyte.integrations.base.JavaBaseConstants; -import io.airbyte.integrations.destination.StandardNameTransformer; -import io.airbyte.integrations.destination.jdbc.JdbcSqlOperations; -import io.airbyte.protocol.models.AirbyteRecordMessage; -import java.io.File; -import java.io.IOException; -import java.nio.file.Files; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.List; -import java.util.stream.Collectors; - -public class DatabricksSqlOperations extends JdbcSqlOperations { - - private boolean isLocalFileEnabled = false; - - @Override - public void executeTransaction(JdbcDatabase database, List queries) throws Exception { - database.executeWithinTransaction(queries); - } - - @Override - public void insertRecordsInternal(JdbcDatabase database, - List records, - String schemaName, - String tmpTableName) - throws SQLException { - if (records.isEmpty()) { - return; - } - - verifyLocalFileEnabled(database); - try { - File tmpFile = Files.createTempFile(tmpTableName + "-", ".tmp").toFile(); - - loadDataIntoTable(database, records, schemaName, tmpTableName, tmpFile); - - Files.delete(tmpFile.toPath()); - } catch (IOException e) { - throw new SQLException(e); - } - } - - private void loadDataIntoTable(JdbcDatabase database, - List records, - String schemaName, - String tmpTableName, - File tmpFile) - throws SQLException { - database.execute(connection -> { - try { - writeBatchToFile(tmpFile, records); - - String absoluteFile = "'" + tmpFile.getAbsolutePath() + "'"; - - String query = String.format( - "LOAD DATA LOCAL INFILE %s INTO TABLE %s.%s FIELDS TERMINATED BY ',' ENCLOSED BY '\"' ESCAPED BY '\\\"' LINES TERMINATED BY '\\r\\n'", - absoluteFile, schemaName, tmpTableName); - - try (Statement stmt = connection.createStatement()) { - stmt.execute(query); - } - } catch (Exception e) { - throw new RuntimeException(e); - } - }); - } - - @Override - protected JsonNode formatData(JsonNode data) { - return StandardNameTransformer.formatJsonPath(data); - } - - void verifyLocalFileEnabled(JdbcDatabase database) throws SQLException { - boolean localFileEnabled = isLocalFileEnabled || checkIfLocalFileIsEnabled(database); - if (!localFileEnabled) { - tryEnableLocalFile(database); - } - isLocalFileEnabled = true; - } - - private void tryEnableLocalFile(JdbcDatabase database) throws SQLException { - database.execute(connection -> { - try (Statement statement = connection.createStatement()) { - statement.execute("set global local_infile=true"); - } catch (Exception e) { - throw new RuntimeException( - "The DB user provided to airbyte was unable to switch on the local_infile attribute on the MySQL server. As an admin user, you will need to run \"SET GLOBAL local_infile = true\" before syncing data with Airbyte.", - e); - } - }); - } - - private double getVersion(JdbcDatabase database) throws SQLException { - List value = database.resultSetQuery(connection -> connection.createStatement().executeQuery("select version()"), - resultSet -> resultSet.getString("version()")).collect(Collectors.toList()); - return Double.parseDouble(value.get(0).substring(0, 3)); - } - - VersionCompatibility isCompatibleVersion(JdbcDatabase database) throws SQLException { - double version = getVersion(database); - return new VersionCompatibility(version, version >= 5.7); - } - - @Override - public boolean isSchemaRequired() { - return false; - } - - private boolean checkIfLocalFileIsEnabled(JdbcDatabase database) throws SQLException { - List value = database.resultSetQuery(connection -> connection.createStatement().executeQuery("SHOW GLOBAL VARIABLES LIKE 'local_infile'"), - resultSet -> resultSet.getString("Value")).collect(Collectors.toList()); - - return value.get(0).equalsIgnoreCase("on"); - } - - @Override - public String createTableQuery(JdbcDatabase database, String schemaName, String tableName) { - // MySQL requires byte information with VARCHAR. Since we are using uuid as value for the column, - // 256 is enough - return String.format( - "CREATE TABLE IF NOT EXISTS %s.%s ( \n" - + "%s VARCHAR(256) PRIMARY KEY,\n" - + "%s JSON,\n" - + "%s TIMESTAMP(6) DEFAULT CURRENT_TIMESTAMP(6)\n" - + ");\n", - schemaName, tableName, JavaBaseConstants.COLUMN_NAME_AB_ID, JavaBaseConstants.COLUMN_NAME_DATA, JavaBaseConstants.COLUMN_NAME_EMITTED_AT); - } - - public static class VersionCompatibility { - - private final double version; - private final boolean isCompatible; - - public VersionCompatibility(double version, boolean isCompatible) { - this.version = version; - this.isCompatible = isCompatible; - } - - public double getVersion() { - return version; - } - - public boolean isCompatible() { - return isCompatible; - } - - } - -} diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopier.java b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopier.java deleted file mode 100644 index 3ae154c4bc1d..000000000000 --- a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopier.java +++ /dev/null @@ -1,142 +0,0 @@ -package io.airbyte.integrations.destination.databricks; - -import com.amazonaws.services.s3.AmazonS3; -import com.fasterxml.jackson.databind.ObjectMapper; -import io.airbyte.db.jdbc.JdbcDatabase; -import io.airbyte.integrations.destination.ExtendedNameTransformer; -import io.airbyte.integrations.destination.jdbc.SqlOperations; -import io.airbyte.integrations.destination.jdbc.copy.StreamCopier; -import io.airbyte.integrations.destination.jdbc.copy.s3.S3Config; -import io.airbyte.integrations.destination.s3.S3DestinationConfig; -import io.airbyte.integrations.destination.s3.parquet.S3ParquetFormatConfig; -import io.airbyte.integrations.destination.s3.parquet.S3ParquetWriter; -import io.airbyte.integrations.destination.s3.writer.S3WriterFactory; -import io.airbyte.protocol.models.AirbyteRecordMessage; -import io.airbyte.protocol.models.AirbyteStream; -import io.airbyte.protocol.models.ConfiguredAirbyteStream; -import io.airbyte.protocol.models.DestinationSyncMode; -import java.sql.Timestamp; -import java.util.UUID; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This implementation is similar to {@link io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopier}. - * The difference is that this implementation creates Parquet staging files, instead of CSV ones. - */ -public class DatabricksStreamCopier implements StreamCopier { - - private static final Logger LOGGER = LoggerFactory.getLogger(DatabricksStreamCopier.class); - private static final ObjectMapper MAPPER = new ObjectMapper(); - - private final AmazonS3 s3Client; - private final S3Config s3Config; - private final String tmpTableName; - private final DestinationSyncMode syncMode; - private final AirbyteStream stream; - private final JdbcDatabase db; - private final String database; - private final String streamName; - private final ExtendedNameTransformer nameTransformer; - private final DatabricksSqlOperations sqlOperations; - private final S3ParquetWriter parquetWriter; - - public DatabricksStreamCopier(String stagingFolder, - DestinationSyncMode syncMode, - String schema, - ConfiguredAirbyteStream configuredStream, - String streamName, - AmazonS3 s3Client, - JdbcDatabase db, - S3Config s3Config, - ExtendedNameTransformer nameTransformer, - SqlOperations sqlOperations, - S3WriterFactory writerFactory, - Timestamp uploadTime) throws Exception { - this.stream = configuredStream.getStream(); - this.syncMode = syncMode; - this.db = db; - this.database = schema; - this.streamName = streamName; - this.nameTransformer = nameTransformer; - this.sqlOperations = (DatabricksSqlOperations) sqlOperations; - this.tmpTableName = nameTransformer.getTmpTableName(streamName); - this.s3Client = s3Client; - this.s3Config = s3Config; - this.parquetWriter = (S3ParquetWriter) writerFactory - .create(getS3DestinationConfig(s3Config, stagingFolder), s3Client, configuredStream, uploadTime); - LOGGER.info(parquetWriter.parquetSchema.toString()); - } - - @Override - public void write(UUID id, AirbyteRecordMessage recordMessage) throws Exception { - parquetWriter.write(id, recordMessage); - } - - @Override - public void closeStagingUploader(boolean hasFailed) throws Exception { - parquetWriter.close(hasFailed); - } - - @Override - public void createDestinationSchema() throws Exception { - LOGGER.info("Creating database in destination if it doesn't exist: {}", database); - sqlOperations.createSchemaIfNotExists(db, database); - } - - @Override - public void createTemporaryTable() throws Exception { - LOGGER.info("Preparing tmp table in destination for stream: {}, database: {}, tmp table name: {}.", streamName, database, tmpTableName); - LOGGER.info(parquetWriter.parquetSchema.toString()); - sqlOperations.createTableIfNotExists(db, database, tmpTableName); - } - - @Override - public void copyStagingFileToTemporaryTable() throws Exception { - LOGGER.info("Starting copy to tmp table: {} in destination for stream: {}, database: {}, .", tmpTableName, streamName, database); - // TODO: load data sql operation - LOGGER.info("Copy to tmp table {} in destination for stream {} complete.", tmpTableName, streamName); - } - - - @Override - public String createDestinationTable() throws Exception { - var destTableName = nameTransformer.getRawTableName(streamName); - LOGGER.info("Preparing table {} in destination.", destTableName); - sqlOperations.createTableIfNotExists(db, database, destTableName); - LOGGER.info("Table {} in destination prepared.", tmpTableName); - - return destTableName; - } - - @Override - public String generateMergeStatement(String destTableName) { - LOGGER.info("Preparing to merge tmp table {} to dest table: {}, database: {}, in destination.", tmpTableName, destTableName, database); - var queries = new StringBuilder(); - if (syncMode.equals(DestinationSyncMode.OVERWRITE)) { - queries.append(sqlOperations.truncateTableQuery(db, database, destTableName)); - LOGGER.info("Destination OVERWRITE mode detected. Dest table: {}, database: {}, truncated.", destTableName, database); - } - queries.append(sqlOperations.copyTableQuery(db, database, tmpTableName, destTableName)); - return queries.toString(); - } - - @Override - public void removeFileAndDropTmpTable() throws Exception { - - } - - private S3DestinationConfig getS3DestinationConfig(S3Config s3Config, String stagingFolder) { - return new S3DestinationConfig( - s3Config.getEndpoint(), - s3Config.getBucketName(), - stagingFolder, - s3Config.getRegion(), - s3Config.getAccessKeyId(), - s3Config.getSecretAccessKey(), - // use default parquet format config - new S3ParquetFormatConfig(MAPPER.createObjectNode()) - ); - } - -} diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopierFactory.java b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopierFactory.java deleted file mode 100644 index 096e134bd223..000000000000 --- a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopierFactory.java +++ /dev/null @@ -1,45 +0,0 @@ -package io.airbyte.integrations.destination.databricks; - -import com.amazonaws.services.s3.AmazonS3; -import io.airbyte.db.jdbc.JdbcDatabase; -import io.airbyte.integrations.destination.ExtendedNameTransformer; -import io.airbyte.integrations.destination.jdbc.SqlOperations; -import io.airbyte.integrations.destination.jdbc.copy.StreamCopier; -import io.airbyte.integrations.destination.jdbc.copy.StreamCopierFactory; -import io.airbyte.integrations.destination.jdbc.copy.s3.S3Config; -import io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopier; -import io.airbyte.integrations.destination.s3.parquet.S3ParquetWriter; -import io.airbyte.integrations.destination.s3.writer.ProductionWriterFactory; -import io.airbyte.integrations.destination.s3.writer.S3WriterFactory; -import io.airbyte.protocol.models.AirbyteStream; -import io.airbyte.protocol.models.ConfiguredAirbyteStream; -import io.airbyte.protocol.models.DestinationSyncMode; -import java.sql.Timestamp; - -public class DatabricksStreamCopierFactory implements StreamCopierFactory { - - @Override - public StreamCopier create(String configuredSchema, - S3Config s3Config, - String stagingFolder, - ConfiguredAirbyteStream configuredStream, - ExtendedNameTransformer nameTransformer, - JdbcDatabase db, - SqlOperations sqlOperations) { - try { - AirbyteStream stream = configuredStream.getStream(); - DestinationSyncMode syncMode = configuredStream.getDestinationSyncMode(); - String schema = StreamCopierFactory.getSchema(stream, configuredSchema, nameTransformer); - AmazonS3 s3Client = S3StreamCopier.getAmazonS3(s3Config); - S3WriterFactory writerFactory = new ProductionWriterFactory(); - Timestamp uploadTimestamp = new Timestamp(System.currentTimeMillis()); - - return new DatabricksStreamCopier( - stagingFolder, syncMode, schema, configuredStream, stream.getName(), s3Client, db, s3Config, nameTransformer, sqlOperations, writerFactory, uploadTimestamp); - } catch (Exception e) { - throw new RuntimeException(e); - } - - } - -} diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-databricks/src/main/resources/spec.json deleted file mode 100644 index f9a10ca18b85..000000000000 --- a/airbyte-integrations/connectors/destination-databricks/src/main/resources/spec.json +++ /dev/null @@ -1,102 +0,0 @@ -{ - "documentationUrl": "https://docs.airbyte.io/integrations/destinations/databricks", - "supportsIncremental": false, - "supportsNormalization": false, - "supportsDBT": false, - "supported_destination_sync_modes": ["overwrite"], - "connectionSpecification": { - "$schema": "http://json-schema.org/draft-07/schema#", - "title": "Databricks Destination Spec", - "type": "object", - "required": [ - "serverHostname", - "httpPath", - "pat" - ], - "additionalProperties": false, - "properties": { - "serverHostname": { - "title": "Server Hostname", - "type": "string", - "description": "", - "examples": [""] - }, - "httpPath": { - "title": "HTTP Path", - "type": "string", - "description": "", - "examples": [""] - }, - "pat": { - "title": "Personal Access Token", - "type": "string", - "description": "", - "examples": [""], - "airbyte_secret": true - }, - "schema": { - "title": "Database", - "type": "string", - "description": "" - }, - "s3_bucket_name": { - "title": "S3 Bucket Name", - "type": "string", - "description": "The name of the S3 bucket to use for intermittent staging of the data.", - "examples": ["airbyte.staging"] - }, - "s3_bucket_region": { - "title": "S3 Bucket Region", - "type": "string", - "default": "", - "description": "The region of the S3 staging bucket to use if utilising a copy strategy.", - "enum": [ - "", - "us-east-1", - "us-east-2", - "us-west-1", - "us-west-2", - "af-south-1", - "ap-east-1", - "ap-south-1", - "ap-northeast-1", - "ap-northeast-2", - "ap-northeast-3", - "ap-southeast-1", - "ap-southeast-2", - "ca-central-1", - "cn-north-1", - "cn-northwest-1", - "eu-central-1", - "eu-north-1", - "eu-south-1", - "eu-west-1", - "eu-west-2", - "eu-west-3", - "sa-east-1", - "me-south-1" - ] - }, - "access_key_id": { - "type": "string", - "description": "The Access Key Id granting allow one to access the above S3 staging bucket. Airbyte requires Read and Write permissions to the given bucket.", - "title": "S3 Key Id", - "airbyte_secret": true - }, - "secret_access_key": { - "type": "string", - "description": "The corresponding secret to the above access key id.", - "title": "S3 Access Key", - "airbyte_secret": true - }, - "part_size": { - "type": "integer", - "minimum": 10, - "maximum": 100, - "examples": ["10"], - "description": "Optional. Increase this if syncing tables larger than 100GB. Files are streamed to S3 in parts. This determines the size of each part, in MBs. As S3 has a limit of 10,000 parts per file, part size affects the table size. This is 10MB by default, resulting in a default limit of 100GB tables. Note, a larger part size will result in larger memory requirements. A rule of thumb is to multiply the part size by 10 to get the memory requirement. Modify this with care.", - "title": "Stream Part Size" - } - } - } -} diff --git a/airbyte-integrations/connectors/destination-databricks/src/test-integration/java/io/airbyte/integrations/destination/databricks/DatabricksDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-databricks/src/test-integration/java/io/airbyte/integrations/destination/databricks/DatabricksDestinationAcceptanceTest.java deleted file mode 100644 index 84aec1d1c0a5..000000000000 --- a/airbyte-integrations/connectors/destination-databricks/src/test-integration/java/io/airbyte/integrations/destination/databricks/DatabricksDestinationAcceptanceTest.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 Airbyte - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package io.airbyte.integrations.destination.databricks; - -import com.fasterxml.jackson.databind.JsonNode; -import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest; -import java.io.IOException; -import java.util.List; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class DatabricksDestinationAcceptanceTest extends DestinationAcceptanceTest { - - private static final Logger LOGGER = LoggerFactory.getLogger(DatabricksDestinationAcceptanceTest.class); - - private JsonNode configJson; - - @Override - protected String getImageName() { - return "airbyte/destination-databricks:dev"; - } - - @Override - protected JsonNode getConfig() { - // TODO: Generate the configuration JSON file to be used for running the destination during the test - // configJson can either be static and read from secrets/config.json directly - // or created in the setup method - return configJson; - } - - @Override - protected JsonNode getFailCheckConfig() { - // TODO return an invalid config which, when used to run the connector's check connection operation, - // should result in a failed connection check - return null; - } - - @Override - protected List retrieveRecords(TestDestinationEnv testEnv, - String streamName, - String namespace, - JsonNode streamSchema) - throws IOException { - // TODO Implement this method to retrieve records which written to the destination by the connector. - // Records returned from this method will be compared against records provided to the connector - // to verify they were written correctly - return null; - } - - @Override - protected void setup(TestDestinationEnv testEnv) { - // TODO Implement this method to run any setup actions needed before every test case - } - - @Override - protected void tearDown(TestDestinationEnv testEnv) { - // TODO Implement this method to run any cleanup actions needed after every test case - } - -} diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/CopyConsumerFactory.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/CopyConsumerFactory.java index a11d341b9144..9dcb075575bb 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/CopyConsumerFactory.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/CopyConsumerFactory.java @@ -24,6 +24,7 @@ package io.airbyte.integrations.destination.jdbc.copy; +import io.airbyte.commons.json.Jsons; import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.integrations.base.AirbyteMessageConsumer; import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; @@ -36,6 +37,8 @@ import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteRecordMessage; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import java.sql.Timestamp; +import java.time.Instant; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -91,7 +94,8 @@ private static Map createWrite for (var configuredStream : catalog.getStreams()) { var stream = configuredStream.getStream(); var pair = AirbyteStreamNameNamespacePair.fromAirbyteSteam(stream); - var copier = streamCopierFactory.create(defaultSchema, config, stagingFolder, configuredStream, namingResolver, database, sqlOperations); + var syncMode = configuredStream.getDestinationSyncMode(); + var copier = streamCopierFactory.create(defaultSchema, config, stagingFolder, syncMode, stream, namingResolver, database, sqlOperations); pairToCopier.put(pair, copier); } @@ -112,7 +116,8 @@ private static RecordWriter recordWriterFunction(Map { @@ -36,17 +35,10 @@ public interface StreamCopierFactory { StreamCopier create(String configuredSchema, T config, String stagingFolder, - ConfiguredAirbyteStream configuredStream, + DestinationSyncMode syncMode, + AirbyteStream stream, ExtendedNameTransformer nameTransformer, JdbcDatabase db, SqlOperations sqlOperations); - static String getSchema(AirbyteStream stream, String configuredSchema, ExtendedNameTransformer nameTransformer) { - if (stream.getNamespace() != null) { - return nameTransformer.convertStreamName(stream.getNamespace()); - } else { - return nameTransformer.convertStreamName(configuredSchema); - } - } - } diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsStreamCopier.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsStreamCopier.java index 69fe6dddff78..c74ee13e8389 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsStreamCopier.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsStreamCopier.java @@ -30,12 +30,10 @@ import com.google.cloud.storage.BlobInfo; import com.google.cloud.storage.Storage; import com.google.cloud.storage.StorageOptions; -import io.airbyte.commons.json.Jsons; import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.integrations.destination.ExtendedNameTransformer; import io.airbyte.integrations.destination.jdbc.SqlOperations; import io.airbyte.integrations.destination.jdbc.copy.StreamCopier; -import io.airbyte.protocol.models.AirbyteRecordMessage; import io.airbyte.protocol.models.DestinationSyncMode; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -46,7 +44,6 @@ import java.nio.charset.StandardCharsets; import java.sql.SQLException; import java.sql.Timestamp; -import java.time.Instant; import java.util.UUID; import org.apache.commons.csv.CSVFormat; import org.apache.commons.csv.CSVPrinter; @@ -106,10 +103,8 @@ public GcsStreamCopier(String stagingFolder, } @Override - public void write(UUID id, AirbyteRecordMessage recordMessage) throws Exception { - csvPrinter.printRecord(id, - Jsons.serialize(recordMessage.getData()), - Timestamp.from(Instant.ofEpochMilli(recordMessage.getEmittedAt()))); + public void write(UUID id, String jsonDataString, Timestamp emittedAt) throws Exception { + csvPrinter.printRecord(id, jsonDataString, emittedAt); } @Override diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsStreamCopierFactory.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsStreamCopierFactory.java index eb2d44af8235..594eaf85285f 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsStreamCopierFactory.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsStreamCopierFactory.java @@ -28,12 +28,12 @@ import com.google.cloud.storage.Storage; import com.google.cloud.storage.StorageOptions; import io.airbyte.db.jdbc.JdbcDatabase; +import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.destination.ExtendedNameTransformer; import io.airbyte.integrations.destination.jdbc.SqlOperations; import io.airbyte.integrations.destination.jdbc.copy.StreamCopier; import io.airbyte.integrations.destination.jdbc.copy.StreamCopierFactory; import io.airbyte.protocol.models.AirbyteStream; -import io.airbyte.protocol.models.ConfiguredAirbyteStream; import io.airbyte.protocol.models.DestinationSyncMode; import java.io.ByteArrayInputStream; import java.io.InputStream; @@ -47,14 +47,14 @@ public abstract class GcsStreamCopierFactory implements StreamCopierFactory { @@ -43,17 +43,17 @@ public abstract class S3StreamCopierFactory implements StreamCopierFactory parquetWriter; private final AvroRecordFactory avroRecordFactory; - public final Schema parquetSchema; public S3ParquetWriter(S3DestinationConfig config, AmazonS3 s3Client, @@ -89,7 +88,6 @@ public S3ParquetWriter(S3DestinationConfig config, .withDictionaryEncoding(formatConfig.isDictionaryEncoding()) .build(); this.avroRecordFactory = new AvroRecordFactory(schema, nameUpdater); - this.parquetSchema = schema; } public static Configuration getHadoopConfig(S3DestinationConfig config) { diff --git a/docs/integrations/destinations/databricks.md b/docs/integrations/destinations/databricks.md deleted file mode 100644 index d2570854d288..000000000000 --- a/docs/integrations/destinations/databricks.md +++ /dev/null @@ -1,52 +0,0 @@ -# Databricks - -TODO: update this doc - -## Sync overview - -### Output schema - -Is the output schema fixed (e.g: for an API like Stripe)? If so, point to the connector's schema (e.g: link to Stripe’s documentation) or describe the schema here directly (e.g: include a diagram or paragraphs describing the schema). - -Describe how the connector's schema is mapped to Airbyte concepts. An example description might be: "MagicDB tables become Airbyte Streams and MagicDB columns become Airbyte Fields. In addition, an extracted\_at column is appended to each row being read." - -### Data type mapping - -This section should contain a table mapping each of the connector's data types to Airbyte types. At the moment, Airbyte uses the same types used by [JSONSchema](https://json-schema.org/understanding-json-schema/reference/index.html). `string`, `date-time`, `object`, `array`, `boolean`, `integer`, and `number` are the most commonly used data types. - -| Integration Type | Airbyte Type | Notes | -| :--- | :--- | :--- | - - -### Features - -This section should contain a table with the following format: - -| Feature | Supported?(Yes/No) | Notes | -| :--- | :--- | :--- | -| Full Refresh Sync | | | -| Incremental Sync | | | -| Replicate Incremental Deletes | | | -| For databases, WAL/Logical replication | | | -| SSL connection | | | -| SSH Tunnel Support | | | -| (Any other source-specific features) | | | - -### Performance considerations - -Could this connector hurt the user's database/API/etc... or put too much strain on it in certain circumstances? For example, if there are a lot of tables or rows in a table? What is the breaking point (e.g: 100mm> records)? What can the user do to prevent this? (e.g: use a read-only replica, or schedule frequent syncs, etc..) - -## Getting started - -### Requirements - -* What versions of this connector does this implementation support? (e.g: `postgres v3.14 and above`) -* What configurations, if any, are required on the connector? (e.g: `buffer_size > 1024`) -* Network accessibility requirements -* Credentials/authentication requirements? (e.g: A DB user with read permissions on certain tables) - -### Setup guide - -For each of the above high-level requirements as appropriate, add or point to a follow-along guide. See existing source or destination guides for an example. - -For each major cloud provider we support, also add a follow-along guide for setting up Airbyte to connect to that destination. See the Postgres destination guide for an example of what this should look like.