diff --git a/.github/workflows/publish-command.yml b/.github/workflows/publish-command.yml index 6413daa648c0..11aff4e2eef6 100644 --- a/.github/workflows/publish-command.yml +++ b/.github/workflows/publish-command.yml @@ -172,6 +172,7 @@ jobs: SOURCE_ZUORA_TEST_CREDS: ${{ secrets.SOURCE_ZUORA_TEST_CREDS }} SOURCE_BAMBOO_HR_CREDS: ${{ secrets.SOURCE_BAMBOO_HR_CREDS }} SOURCE_BIGCOMMERCE_CREDS: ${{ secrets.SOURCE_BIGCOMMERCE_CREDS }} + DESTINATION_DATABRICKS_CREDS: ${{ secrets.DESTINATION_DATABRICKS_CREDS }} - run: | docker login -u airbytebot -p ${DOCKER_PASSWORD} ./tools/integrations/manage.sh publish airbyte-integrations/${{ github.event.inputs.connector }} ${{ github.event.inputs.run-tests }} diff --git a/.github/workflows/test-command.yml b/.github/workflows/test-command.yml index 192bc2776698..278fb9a70673 100644 --- a/.github/workflows/test-command.yml +++ b/.github/workflows/test-command.yml @@ -172,6 +172,7 @@ jobs: SOURCE_ZUORA_TEST_CREDS: ${{ secrets.SOURCE_ZUORA_TEST_CREDS }} SOURCE_BAMBOO_HR_CREDS: ${{ secrets.SOURCE_BAMBOO_HR_CREDS }} SOURCE_BIGCOMMERCE_CREDS: ${{ secrets.SOURCE_BIGCOMMERCE_CREDS }} + DESTINATION_DATABRICKS_CREDS: ${{ secrets.DESTINATION_DATABRICKS_CREDS }} - run: | ./tools/bin/ci_integration_test.sh ${{ github.event.inputs.connector }} name: test ${{ github.event.inputs.connector }} diff --git a/airbyte-integrations/builds.md b/airbyte-integrations/builds.md index b01b801d9652..0ad1a894f843 100644 --- a/airbyte-integrations/builds.md +++ b/airbyte-integrations/builds.md @@ -89,6 +89,7 @@ | :--- | :--- | | Azure Blob Storage | [![destination-azure-blob-storage](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-azure-blob-storage%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-azure-blob-storage) | | BigQuery | [![destination-bigquery](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-bigquery%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-bigquery) | +| Databricks | [![destination-bigquery](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-databricks%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-databricks) | | Google Cloud Storage (GCS) | [![destination-gcs](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-s3%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-gcs) | | Google PubSub | [![destination-pubsub](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-pubsub%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-pubsub) | | Kafka | [![destination-kafka](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-kafka%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-kafka) | diff --git a/airbyte-integrations/connectors/destination-databricks/.dockerignore b/airbyte-integrations/connectors/destination-databricks/.dockerignore new file mode 100644 index 000000000000..65c7d0ad3e73 --- /dev/null +++ b/airbyte-integrations/connectors/destination-databricks/.dockerignore @@ -0,0 +1,3 @@ +* +!Dockerfile +!build diff --git a/airbyte-integrations/connectors/destination-databricks/.gitignore b/airbyte-integrations/connectors/destination-databricks/.gitignore new file mode 100644 index 000000000000..c04f34fae172 --- /dev/null +++ b/airbyte-integrations/connectors/destination-databricks/.gitignore @@ -0,0 +1,6 @@ +# The driver is not checked into the source code due to legal reasons. +# You can download the driver here: +# https://databricks.com/spark/jdbc-drivers-download +# By downloading this driver, you agree to the terms & conditions: +# https://databricks.com/jdbc-odbc-driver-license +lib/SparkJDBC42.jar diff --git a/airbyte-integrations/connectors/destination-databricks/BOOTSTRAP.md b/airbyte-integrations/connectors/destination-databricks/BOOTSTRAP.md new file mode 100644 index 000000000000..85942c0e3175 --- /dev/null +++ b/airbyte-integrations/connectors/destination-databricks/BOOTSTRAP.md @@ -0,0 +1,6 @@ +# Databricks Destination Connector Bootstrap + +The Databricks Connector enables a developer to sync data into a Databricks cluster. It does so in two steps: + +1. Persist source data in S3 staging files in the Parquet format. +2. Create delta table based on the Parquet staging files. diff --git a/airbyte-integrations/connectors/destination-databricks/Dockerfile b/airbyte-integrations/connectors/destination-databricks/Dockerfile new file mode 100644 index 000000000000..4c3d7fc644ca --- /dev/null +++ b/airbyte-integrations/connectors/destination-databricks/Dockerfile @@ -0,0 +1,11 @@ +FROM airbyte/integration-base-java:dev + +WORKDIR /airbyte +ENV APPLICATION destination-databricks + +COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar + +RUN tar xf ${APPLICATION}.tar --strip-components=1 + +LABEL io.airbyte.version=0.1.0 +LABEL io.airbyte.name=airbyte/destination-databricks diff --git a/airbyte-integrations/connectors/destination-databricks/README.md b/airbyte-integrations/connectors/destination-databricks/README.md new file mode 100644 index 000000000000..5a9ab5bf1cb1 --- /dev/null +++ b/airbyte-integrations/connectors/destination-databricks/README.md @@ -0,0 +1,82 @@ +# Destination Databricks + +This is the repository for the Databricks destination connector in Java. +For information about how to use this connector within Airbyte, see [the User Documentation](https://docs.airbyte.io/integrations/destinations/databricks). + +## Databricks JDBC Driver +This connector requires a JDBC driver to connect to Databricks cluster. The driver is developed by Simba. Before downloading and using this driver, you must agree to the [JDBC ODBC driver license](https://databricks.com/jdbc-odbc-driver-license). This means that you can only use this driver to connector third party applications to Apache Spark SQL within a Databricks offering using the ODBC and/or JDBC protocols. The driver can be downloaded from [here](https://databricks.com/spark/jdbc-drivers-download). + +This is currently a private connector that is only available in Airbyte Cloud. To build and publish this connector, first download the driver and put it under the `lib` directory. Please do not publish this connector publicly. We are working on a solution to publicize it. + +## Local development + +#### Building via Gradle +From the Airbyte repository root, run: +``` +./gradlew :airbyte-integrations:connectors:destination-databricks:build +``` + +#### Create credentials +**If you are a community contributor**, you will need access to AWS S3 and Databricks cluster to run the integration tests: + +- Create a Databricks cluster. See [documentation](https://docs.databricks.com/clusters/create.html). +- Create an S3 bucket. See [documentation](https://docs.aws.amazon.com/general/latest/gr/aws-sec-cred-types.html#access-keys-and-secret-access-keys). +- Grant the Databricks cluster full access to the S3 bucket. Or mount it as Databricks File System (DBFS). See [documentation](https://docs.databricks.com/data/data-sources/aws/amazon-s3.html). +- Place both Databricks and S3 credentials in `sample_secrets/config.json`, which conforms to the spec file in `src/main/resources/spec.json`. +- Rename the directory from `sample_secrets` to `secrets`. +- Note that the `secrets` directory is git-ignored by default, so there is no danger of accidentally checking in sensitive information. + +**If you are an Airbyte core member**: + +- Get the `destination databricks creds` secrets on Last Pass, and put it in `sample_secrets/config.json`. +- Rename the directory from `sample_secrets` to `secrets`. + +### Locally running the connector docker image + +#### Build +Build the connector image via Gradle: +``` +./gradlew :airbyte-integrations:connectors:destination-databricks:airbyteDocker +``` +When building via Gradle, the docker image name and tag, respectively, are the values of the `io.airbyte.name` and `io.airbyte.version` `LABEL`s in +the Dockerfile. + +#### Run +Then run any of the connector commands as follows: +``` +docker run --rm airbyte/destination-databricks:dev spec +docker run --rm -v $(pwd)/secrets:/secrets airbyte/destination-databricks:dev check --config /secrets/config.json +docker run --rm -v $(pwd)/secrets:/secrets airbyte/destination-databricks:dev discover --config /secrets/config.json +docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/destination-databricks:dev read --config /secrets/config.json --catalog /integration_tests/configured_catalog.json +``` + +## Testing +We use `JUnit` for Java tests. + +### Unit and Integration Tests +Place unit tests under `src/test/io/airbyte/integrations/destinations/databricks`. + +#### Acceptance Tests +Airbyte has a standard test suite that all destination connectors must pass. Implement the `TODO`s in +`src/test-integration/java/io/airbyte/integrations/destinations/databricksDestinationAcceptanceTest.java`. + +### Using gradle to run tests +All commands should be run from airbyte project root. +To run unit tests: +``` +./gradlew :airbyte-integrations:connectors:destination-databricks:unitTest +``` +To run acceptance and custom integration tests: +``` +./gradlew :airbyte-integrations:connectors:destination-databricks:integrationTest +``` + +## Dependency Management + +### Publishing a new version of the connector +You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what? +1. Make sure your changes are passing unit and integration tests. +1. Bump the connector version in `Dockerfile` -- just increment the value of the `LABEL io.airbyte.version` appropriately (we use [SemVer](https://semver.org/)). +1. Create a Pull Request. +1. Pat yourself on the back for being an awesome contributor. +1. Someone from Airbyte will take a look at your PR and iterate with you to merge it into master. diff --git a/airbyte-integrations/connectors/destination-databricks/build.gradle b/airbyte-integrations/connectors/destination-databricks/build.gradle new file mode 100644 index 000000000000..24f6b9a9f062 --- /dev/null +++ b/airbyte-integrations/connectors/destination-databricks/build.gradle @@ -0,0 +1,31 @@ +plugins { + id 'application' + id 'airbyte-docker' + id 'airbyte-integration-test-java' +} + +application { + mainClass = 'io.airbyte.integrations.destination.databricks.DatabricksDestination' +} + +dependencies { + implementation project(':airbyte-db:lib') + implementation project(':airbyte-config:models') + implementation project(':airbyte-protocol:models') + implementation project(':airbyte-integrations:bases:base-java') + implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs) + implementation project(':airbyte-integrations:connectors:destination-jdbc') + implementation project(':airbyte-integrations:connectors:destination-s3') + // Spark JDBC is not checked into the repo for legal reason + implementation files("lib/SparkJDBC42.jar") + + // parquet + implementation group: 'org.apache.hadoop', name: 'hadoop-common', version: '3.3.0' + implementation group: 'org.apache.hadoop', name: 'hadoop-aws', version: '3.3.0' + implementation group: 'org.apache.hadoop', name: 'hadoop-mapreduce-client-core', version: '3.3.0' + implementation group: 'org.apache.parquet', name: 'parquet-avro', version: '1.12.0' + implementation group: 'tech.allegro.schema.json2avro', name: 'converter', version: '0.2.10' + + integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test') + integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-databricks') +} diff --git a/airbyte-integrations/connectors/destination-databricks/lib/.keep b/airbyte-integrations/connectors/destination-databricks/lib/.keep new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/airbyte-integrations/connectors/destination-databricks/sample_secrets/config.json b/airbyte-integrations/connectors/destination-databricks/sample_secrets/config.json new file mode 100644 index 000000000000..febd66273f7e --- /dev/null +++ b/airbyte-integrations/connectors/destination-databricks/sample_secrets/config.json @@ -0,0 +1,15 @@ +{ + "databricks_server_hostname": "required", + "databricks_http_path": "required", + "databricks_port": "443", + "databricks_personal_access_token": "required", + "database_schema": "public", + "data_source": { + "data_source_type": "S3", + "s3_bucket_name": "required", + "s3_bucket_path":"required", + "s3_bucket_region": "required", + "s3_access_key_id": "required", + "s3_secret_access_key": "required" + } +} diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksConstants.java b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksConstants.java new file mode 100644 index 000000000000..7b8dbbfacfd6 --- /dev/null +++ b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksConstants.java @@ -0,0 +1,14 @@ +package io.airbyte.integrations.destination.databricks; + +import java.util.Set; + +public class DatabricksConstants { + + public static final String DATABRICKS_USERNAME = "token"; + public static final String DATABRICKS_DRIVER_CLASS = "com.simba.spark.jdbc.Driver"; + + public static final Set DEFAULT_TBL_PROPERTIES = Set.of( + "delta.autoOptimize.optimizeWrite = true", + "delta.autoOptimize.autoCompact = true"); + +} diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksDestination.java b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksDestination.java new file mode 100644 index 000000000000..8b118e5cd44f --- /dev/null +++ b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksDestination.java @@ -0,0 +1,101 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.databricks; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.db.Databases; +import io.airbyte.db.jdbc.JdbcDatabase; +import io.airbyte.integrations.base.AirbyteMessageConsumer; +import io.airbyte.integrations.base.IntegrationRunner; +import io.airbyte.integrations.destination.ExtendedNameTransformer; +import io.airbyte.integrations.destination.jdbc.SqlOperations; +import io.airbyte.integrations.destination.jdbc.copy.CopyConsumerFactory; +import io.airbyte.integrations.destination.jdbc.copy.CopyDestination; +import io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopier; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import java.util.function.Consumer; + +public class DatabricksDestination extends CopyDestination { + + public DatabricksDestination() { + super("database_schema"); + } + + public static void main(String[] args) throws Exception { + new IntegrationRunner(new DatabricksDestination()).run(args); + } + + @Override + public AirbyteMessageConsumer getConsumer(JsonNode config, ConfiguredAirbyteCatalog catalog, Consumer outputRecordCollector) { + DatabricksDestinationConfig databricksConfig = DatabricksDestinationConfig.get(config); + return CopyConsumerFactory.create( + outputRecordCollector, + getDatabase(config), + getSqlOperations(), + getNameTransformer(), + databricksConfig, + catalog, + new DatabricksStreamCopierFactory(), + databricksConfig.getDatabaseSchema()); + } + + @Override + public void checkPersistence(JsonNode config) { + DatabricksDestinationConfig databricksConfig = DatabricksDestinationConfig.get(config); + S3StreamCopier.attemptS3WriteAndDelete(databricksConfig.getS3DestinationConfig().getS3Config()); + } + + @Override + public ExtendedNameTransformer getNameTransformer() { + return new DatabricksNameTransformer(); + } + + @Override + public JdbcDatabase getDatabase(JsonNode jsonConfig) { + return getDatabase(DatabricksDestinationConfig.get(jsonConfig)); + } + + @Override + public SqlOperations getSqlOperations() { + return new DatabricksSqlOperations(); + } + + static String getDatabricksConnectionString(DatabricksDestinationConfig databricksConfig) { + return String.format("jdbc:spark://%s:%s/default;transportMode=http;ssl=1;httpPath=%s;UserAgentEntry=Airbyte", + databricksConfig.getDatabricksServerHostname(), + databricksConfig.getDatabricksPort(), + databricksConfig.getDatabricksHttpPath()); + } + + static JdbcDatabase getDatabase(DatabricksDestinationConfig databricksConfig) { + return Databases.createJdbcDatabase( + DatabricksConstants.DATABRICKS_USERNAME, + databricksConfig.getDatabricksPersonalAccessToken(), + getDatabricksConnectionString(databricksConfig), + DatabricksConstants.DATABRICKS_DRIVER_CLASS); + } + +} diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksDestinationConfig.java b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksDestinationConfig.java new file mode 100644 index 000000000000..0c54c537cd82 --- /dev/null +++ b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksDestinationConfig.java @@ -0,0 +1,119 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.databricks; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.airbyte.integrations.destination.s3.S3DestinationConfig; +import io.airbyte.integrations.destination.s3.parquet.S3ParquetFormatConfig; + +/** + * Currently only S3 is supported. So the data source config is always {@link S3DestinationConfig}. + */ +public class DatabricksDestinationConfig { + + static final String DEFAULT_DATABRICKS_PORT = "443"; + static final String DEFAULT_DATABASE_SCHEMA = "public"; + static final boolean DEFAULT_PURGE_STAGING_DATA = true; + + private final String databricksServerHostname; + private final String databricksHttpPath; + private final String databricksPort; + private final String databricksPersonalAccessToken; + private final String databaseSchema; + private final boolean purgeStagingData; + private final S3DestinationConfig s3DestinationConfig; + + public DatabricksDestinationConfig(String databricksServerHostname, + String databricksHttpPath, + String databricksPort, + String databricksPersonalAccessToken, + String databaseSchema, + boolean purgeStagingData, + S3DestinationConfig s3DestinationConfig) { + this.databricksServerHostname = databricksServerHostname; + this.databricksHttpPath = databricksHttpPath; + this.databricksPort = databricksPort; + this.databricksPersonalAccessToken = databricksPersonalAccessToken; + this.databaseSchema = databaseSchema; + this.purgeStagingData = purgeStagingData; + this.s3DestinationConfig = s3DestinationConfig; + } + + public static DatabricksDestinationConfig get(JsonNode config) { + return new DatabricksDestinationConfig( + config.get("databricks_server_hostname").asText(), + config.get("databricks_http_path").asText(), + config.has("databricks_port") ? config.get("databricks_port").asText() : DEFAULT_DATABRICKS_PORT, + config.get("databricks_personal_access_token").asText(), + config.has("database_schema") ? config.get("database_schema").asText() : DEFAULT_DATABASE_SCHEMA, + config.has("purge_staging_data") ? config.get("purge_staging_data").asBoolean() : DEFAULT_PURGE_STAGING_DATA, + getDataSource(config.get("data_source"))); + } + + public static S3DestinationConfig getDataSource(JsonNode dataSource) { + return new S3DestinationConfig( + "", + dataSource.get("s3_bucket_name").asText(), + dataSource.get("s3_bucket_path").asText(), + dataSource.get("s3_bucket_region").asText(), + dataSource.get("s3_access_key_id").asText(), + dataSource.get("s3_secret_access_key").asText(), + getDefaultParquetConfig()); + } + + public String getDatabricksServerHostname() { + return databricksServerHostname; + } + + private static S3ParquetFormatConfig getDefaultParquetConfig() { + return new S3ParquetFormatConfig(new ObjectMapper().createObjectNode()); + } + + public String getDatabricksHttpPath() { + return databricksHttpPath; + } + + public String getDatabricksPort() { + return databricksPort; + } + + public String getDatabricksPersonalAccessToken() { + return databricksPersonalAccessToken; + } + + public String getDatabaseSchema() { + return databaseSchema; + } + + public boolean isPurgeStagingData() { + return purgeStagingData; + } + + public S3DestinationConfig getS3DestinationConfig() { + return s3DestinationConfig; + } + +} diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksNameTransformer.java b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksNameTransformer.java new file mode 100644 index 000000000000..c0e81f5f2f0b --- /dev/null +++ b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksNameTransformer.java @@ -0,0 +1,56 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.databricks; + +import io.airbyte.integrations.destination.ExtendedNameTransformer; + +public class DatabricksNameTransformer extends ExtendedNameTransformer { + + @Override + public String convertStreamName(String input) { + return applyDefaultCase(super.convertStreamName(input)); + } + + @Override + public String getIdentifier(String name) { + return applyDefaultCase(super.getIdentifier(name)); + } + + @Override + public String getTmpTableName(String streamName) { + return applyDefaultCase(super.getTmpTableName(streamName)); + } + + @Override + public String getRawTableName(String streamName) { + return applyDefaultCase(super.getRawTableName(streamName)); + } + + @Override + protected String applyDefaultCase(String input) { + return input.toLowerCase(); + } + +} diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksSqlOperations.java b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksSqlOperations.java new file mode 100644 index 000000000000..a1d2654627f0 --- /dev/null +++ b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksSqlOperations.java @@ -0,0 +1,70 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.databricks; + +import io.airbyte.db.jdbc.JdbcDatabase; +import io.airbyte.integrations.base.JavaBaseConstants; +import io.airbyte.integrations.destination.jdbc.JdbcSqlOperations; +import io.airbyte.protocol.models.AirbyteRecordMessage; +import java.util.List; + +public class DatabricksSqlOperations extends JdbcSqlOperations { + + @Override + public void executeTransaction(JdbcDatabase database, List queries) throws Exception { + for (String query : queries) { + database.execute(query); + } + } + + /** + * Spark SQL does not support many of the data definition keywords and types as in Postgres. + * Reference: https://spark.apache.org/docs/latest/sql-ref-datatypes.html + */ + @Override + public String createTableQuery(JdbcDatabase database, String schemaName, String tableName) { + return String.format( + "CREATE TABLE IF NOT EXISTS %s.%s (%s STRING, %s STRING, %s TIMESTAMP);", + schemaName, tableName, + JavaBaseConstants.COLUMN_NAME_AB_ID, + JavaBaseConstants.COLUMN_NAME_DATA, + JavaBaseConstants.COLUMN_NAME_EMITTED_AT); + } + + @Override + public void createSchemaIfNotExists(JdbcDatabase database, String schemaName) throws Exception { + database.execute(String.format("create database if not exists %s;", schemaName)); + } + + @Override + public void insertRecordsInternal(JdbcDatabase database, + List records, + String schemaName, + String tmpTableName) { + // Do nothing. The records are copied into the table directly from the staging parquet file. + // So no manual insertion is needed. + } + +} diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopier.java b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopier.java new file mode 100644 index 000000000000..9844d684711c --- /dev/null +++ b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopier.java @@ -0,0 +1,221 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.databricks; + +import com.amazonaws.services.s3.AmazonS3; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.airbyte.db.jdbc.JdbcDatabase; +import io.airbyte.integrations.destination.ExtendedNameTransformer; +import io.airbyte.integrations.destination.jdbc.SqlOperations; +import io.airbyte.integrations.destination.jdbc.copy.StreamCopier; +import io.airbyte.integrations.destination.s3.S3DestinationConfig; +import io.airbyte.integrations.destination.s3.parquet.S3ParquetFormatConfig; +import io.airbyte.integrations.destination.s3.parquet.S3ParquetWriter; +import io.airbyte.integrations.destination.s3.writer.S3WriterFactory; +import io.airbyte.protocol.models.AirbyteRecordMessage; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; +import io.airbyte.protocol.models.DestinationSyncMode; +import java.sql.Timestamp; +import java.util.UUID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This implementation is similar to + * {@link io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopier}. The difference is that + * this implementation creates Parquet staging files, instead of CSV ones. + *

+ * It does the following operations: + *

  • 1. Parquet writer writes data stream into staging parquet file in + * s3:////.
  • + *
  • 2. Create a tmp delta table based on the staging parquet file.
  • + *
  • 3. Create the destination delta table based on the tmp delta table schema in + * s3:///.
  • + *
  • 4. Copy the staging parquet file into the destination delta table.
  • + *
  • 5. Delete the tmp delta table, and the staging parquet file.
  • + */ +public class DatabricksStreamCopier implements StreamCopier { + + private static final Logger LOGGER = LoggerFactory.getLogger(DatabricksStreamCopier.class); + private static final ObjectMapper MAPPER = new ObjectMapper(); + + private final String schemaName; + private final String streamName; + private final DestinationSyncMode destinationSyncMode; + private final AmazonS3 s3Client; + private final S3DestinationConfig s3Config; + private final boolean purgeStagingData; + private final JdbcDatabase database; + private final DatabricksSqlOperations sqlOperations; + + private final String tmpTableName; + private final String destTableName; + private final S3ParquetWriter parquetWriter; + private final String tmpTableLocation; + private final String destTableLocation; + + public DatabricksStreamCopier(String stagingFolder, + String schema, + ConfiguredAirbyteStream configuredStream, + AmazonS3 s3Client, + JdbcDatabase database, + DatabricksDestinationConfig databricksConfig, + ExtendedNameTransformer nameTransformer, + SqlOperations sqlOperations, + S3WriterFactory writerFactory, + Timestamp uploadTime) + throws Exception { + this.schemaName = schema; + this.streamName = configuredStream.getStream().getName(); + this.destinationSyncMode = configuredStream.getDestinationSyncMode(); + this.s3Client = s3Client; + this.s3Config = databricksConfig.getS3DestinationConfig(); + this.purgeStagingData = databricksConfig.isPurgeStagingData(); + this.database = database; + this.sqlOperations = (DatabricksSqlOperations) sqlOperations; + + this.tmpTableName = nameTransformer.getTmpTableName(streamName); + this.destTableName = nameTransformer.getIdentifier(streamName); + + S3DestinationConfig stagingS3Config = getStagingS3DestinationConfig(s3Config, stagingFolder); + this.parquetWriter = (S3ParquetWriter) writerFactory.create(stagingS3Config, s3Client, configuredStream, uploadTime); + + this.tmpTableLocation = String.format("s3://%s/%s", + s3Config.getBucketName(), parquetWriter.getOutputPrefix()); + this.destTableLocation = String.format("s3://%s/%s/%s/%s", + s3Config.getBucketName(), s3Config.getBucketPath(), databricksConfig.getDatabaseSchema(), streamName); + + LOGGER.info("[Stream {}] Database schema: {}", streamName, schemaName); + LOGGER.info("[Stream {}] Parquet schema: {}", streamName, parquetWriter.getParquetSchema()); + LOGGER.info("[Stream {}] Tmp table {} location: {}", streamName, tmpTableName, tmpTableLocation); + LOGGER.info("[Stream {}] Data table {} location: {}", streamName, destTableName, destTableLocation); + + parquetWriter.initialize(); + } + + @Override + public void write(UUID id, AirbyteRecordMessage recordMessage) throws Exception { + parquetWriter.write(id, recordMessage); + } + + @Override + public void closeStagingUploader(boolean hasFailed) throws Exception { + parquetWriter.close(hasFailed); + } + + @Override + public void createDestinationSchema() throws Exception { + LOGGER.info("[Stream {}] Creating database schema if it does not exist: {}", streamName, schemaName); + sqlOperations.createSchemaIfNotExists(database, schemaName); + } + + @Override + public void createTemporaryTable() throws Exception { + LOGGER.info("[Stream {}] Creating tmp table {} from staging file: {}", streamName, tmpTableName, tmpTableLocation); + + sqlOperations.dropTableIfExists(database, schemaName, tmpTableName); + String createTmpTable = String.format("CREATE TABLE %s.%s USING parquet LOCATION '%s';", schemaName, tmpTableName, tmpTableLocation); + LOGGER.info(createTmpTable); + database.execute(createTmpTable); + } + + @Override + public void copyStagingFileToTemporaryTable() { + // The tmp table is created directly based on the staging file. So no separate copying step is + // needed. + } + + @Override + public String createDestinationTable() throws Exception { + LOGGER.info("[Stream {}] Creating destination table if it does not exist: {}", streamName, destTableName); + + String createStatement = destinationSyncMode == DestinationSyncMode.OVERWRITE + // "create or replace" is the recommended way to replace existing table + ? "CREATE OR REPLACE TABLE" + : "CREATE TABLE IF NOT EXISTS"; + + String createTable = String.format( + "%s %s.%s " + + "USING delta " + + "LOCATION '%s' " + + "COMMENT 'Created from stream %s' " + + "TBLPROPERTIES ('airbyte.destinationSyncMode' = '%s', %s) " + + // create the table based on the schema of the tmp table + "AS SELECT * FROM %s.%s LIMIT 0", + createStatement, + schemaName, destTableName, + destTableLocation, + streamName, + destinationSyncMode.value(), + String.join(", ", DatabricksConstants.DEFAULT_TBL_PROPERTIES), + schemaName, tmpTableName); + LOGGER.info(createTable); + database.execute(createTable); + + return destTableName; + } + + @Override + public String generateMergeStatement(String destTableName) { + String copyData = String.format( + "COPY INTO %s.%s " + + "FROM '%s' " + + "FILEFORMAT = PARQUET " + + "PATTERN = '%s'", + schemaName, destTableName, + tmpTableLocation, + parquetWriter.getOutputFilename()); + LOGGER.info(copyData); + return copyData; + } + + @Override + public void removeFileAndDropTmpTable() throws Exception { + if (purgeStagingData) { + LOGGER.info("[Stream {}] Deleting tmp table: {}", streamName, tmpTableName); + sqlOperations.dropTableIfExists(database, schemaName, tmpTableName); + + LOGGER.info("[Stream {}] Deleting staging file: {}", streamName, parquetWriter.getOutputFilePath()); + s3Client.deleteObject(s3Config.getBucketName(), parquetWriter.getOutputFilePath()); + } + } + + /** + * The staging data location is s3:////. This method + * creates an {@link S3DestinationConfig} whose bucket path is /. + */ + static S3DestinationConfig getStagingS3DestinationConfig(S3DestinationConfig config, String stagingFolder) { + return new S3DestinationConfig( + config.getEndpoint(), + config.getBucketName(), + String.join("/", config.getBucketPath(), stagingFolder), + config.getBucketRegion(), + config.getAccessKeyId(), + config.getSecretAccessKey(), + // use default parquet format config + new S3ParquetFormatConfig(MAPPER.createObjectNode())); + } + +} diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopierFactory.java b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopierFactory.java new file mode 100644 index 000000000000..f1285f20e9df --- /dev/null +++ b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopierFactory.java @@ -0,0 +1,64 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.databricks; + +import com.amazonaws.services.s3.AmazonS3; +import io.airbyte.db.jdbc.JdbcDatabase; +import io.airbyte.integrations.destination.ExtendedNameTransformer; +import io.airbyte.integrations.destination.jdbc.SqlOperations; +import io.airbyte.integrations.destination.jdbc.copy.StreamCopier; +import io.airbyte.integrations.destination.jdbc.copy.StreamCopierFactory; +import io.airbyte.integrations.destination.s3.writer.ProductionWriterFactory; +import io.airbyte.integrations.destination.s3.writer.S3WriterFactory; +import io.airbyte.protocol.models.AirbyteStream; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; +import java.sql.Timestamp; + +public class DatabricksStreamCopierFactory implements StreamCopierFactory { + + @Override + public StreamCopier create(String configuredSchema, + DatabricksDestinationConfig databricksConfig, + String stagingFolder, + ConfiguredAirbyteStream configuredStream, + ExtendedNameTransformer nameTransformer, + JdbcDatabase database, + SqlOperations sqlOperations) { + try { + AirbyteStream stream = configuredStream.getStream(); + String schema = StreamCopierFactory.getSchema(stream.getNamespace(), configuredSchema, nameTransformer); + AmazonS3 s3Client = databricksConfig.getS3DestinationConfig().getS3Client(); + S3WriterFactory writerFactory = new ProductionWriterFactory(); + Timestamp uploadTimestamp = new Timestamp(System.currentTimeMillis()); + + return new DatabricksStreamCopier(stagingFolder, schema, configuredStream, s3Client, database, + databricksConfig, nameTransformer, sqlOperations, writerFactory, uploadTimestamp); + } catch (Exception e) { + throw new RuntimeException(e); + } + + } + +} diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-databricks/src/main/resources/spec.json new file mode 100644 index 000000000000..beea2dc50b53 --- /dev/null +++ b/airbyte-integrations/connectors/destination-databricks/src/main/resources/spec.json @@ -0,0 +1,145 @@ +{ + "documentationUrl": "https://docs.airbyte.io/integrations/destinations/databricks", + "supportsIncremental": true, + "supportsNormalization": false, + "supportsDBT": false, + "supported_destination_sync_modes": ["overwrite", "append"], + "connectionSpecification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Databricks Destination Spec", + "type": "object", + "required": [ + "databricks_server_hostname", + "databricks_http_path", + "databricks_personal_access_token", + "data_source" + ], + "additionalProperties": false, + "properties": { + "databricks_server_hostname": { + "title": "Databricks Cluster Server Hostname", + "type": "string", + "description": "", + "examples": ["abc-12345678-wxyz.cloud.databricks.com"] + }, + "databricks_http_path": { + "title": "Databricks Cluster HTTP Path", + "type": "string", + "description": "", + "examples": ["sql/protocolvx/o/1234567489/0000-1111111-abcd90"] + }, + "databricks_port": { + "title": "Databricks Cluster Port", + "type": "string", + "description": "", + "default": "443", + "examples": ["443"] + }, + "databricks_personal_access_token": { + "title": "Databricks Personal Access Token", + "type": "string", + "description": "", + "examples": [""], + "airbyte_secret": true + }, + "database_schema": { + "title": "Database Schema", + "type": "string", + "description": "The default schema tables are written to if the source does not specify a namespace. Unless specifically configured, the usual value for this field is \"public\".", + "default": "public", + "examples": ["public"] + }, + "data_source": { + "title": "Data Source", + "type": "object", + "description": "Storage on which the delta lake is built", + "oneOf": [ + { + "title": "Amazon S3", + "required": [ + "data_source_type", + "s3_bucket_name", + "s3_bucket_path", + "s3_bucket_region", + "s3_access_key_id", + "s3_secret_access_key" + ], + "properties": { + "data_source_type": { + "type": "string", + "enum": ["S3"], + "default": "S3" + }, + "s3_bucket_name": { + "title": "S3 Bucket Name", + "type": "string", + "description": "The name of the S3 bucket to use for intermittent staging of the data.", + "examples": ["airbyte.staging"] + }, + "s3_bucket_path": { + "Title": "S3 Bucket Path", + "type": "string", + "description": "The directory under the S3 bucket where data will be written.", + "examples": ["data_sync/test"] + }, + "s3_bucket_region": { + "title": "S3 Bucket Region", + "type": "string", + "default": "", + "description": "The region of the S3 staging bucket to use if utilising a copy strategy.", + "enum": [ + "", + "us-east-1", + "us-east-2", + "us-west-1", + "us-west-2", + "af-south-1", + "ap-east-1", + "ap-south-1", + "ap-northeast-1", + "ap-northeast-2", + "ap-northeast-3", + "ap-southeast-1", + "ap-southeast-2", + "ca-central-1", + "cn-north-1", + "cn-northwest-1", + "eu-central-1", + "eu-north-1", + "eu-south-1", + "eu-west-1", + "eu-west-2", + "eu-west-3", + "sa-east-1", + "me-south-1", + "us-gov-east-1", + "us-gov-west-1" + ] + }, + "s3_access_key_id": { + "type": "string", + "description": "The Access Key Id granting allow one to access the above S3 staging bucket. Airbyte requires Read and Write permissions to the given bucket.", + "title": "S3 Key Id", + "examples": ["A012345678910EXAMPLE"], + "airbyte_secret": true + }, + "s3_secret_access_key": { + "title": "S3 Access Key", + "type": "string", + "description": "The corresponding secret to the above access key id.", + "examples": ["a012345678910ABCDEFGH/AbCdEfGhEXAMPLEKEY"], + "airbyte_secret": true + } + } + } + ] + }, + "purge_staging_data": { + "title": "Purge Staging Files and Tables", + "type": "boolean", + "description": "Default to 'true'. Switch it to 'false' for debugging purpose.", + "default": true + } + } + } +} diff --git a/airbyte-integrations/connectors/destination-databricks/src/test-integration/java/io/airbyte/integrations/destination/databricks/DatabricksDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-databricks/src/test-integration/java/io/airbyte/integrations/destination/databricks/DatabricksDestinationAcceptanceTest.java new file mode 100644 index 000000000000..acfc74bfd7b7 --- /dev/null +++ b/airbyte-integrations/connectors/destination-databricks/src/test-integration/java/io/airbyte/integrations/destination/databricks/DatabricksDestinationAcceptanceTest.java @@ -0,0 +1,170 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.databricks; + +import static org.jooq.impl.DSL.asterisk; +import static org.jooq.impl.DSL.field; + +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.DeleteObjectsRequest; +import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion; +import com.amazonaws.services.s3.model.DeleteObjectsResult; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import io.airbyte.commons.io.IOs; +import io.airbyte.commons.json.Jsons; +import io.airbyte.db.Database; +import io.airbyte.db.Databases; +import io.airbyte.integrations.base.JavaBaseConstants; +import io.airbyte.integrations.destination.ExtendedNameTransformer; +import io.airbyte.integrations.destination.jdbc.copy.StreamCopierFactory; +import io.airbyte.integrations.destination.s3.S3DestinationConfig; +import io.airbyte.integrations.destination.s3.avro.JsonFieldNameUpdater; +import io.airbyte.integrations.destination.s3.util.AvroRecordHelper; +import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest; +import java.nio.file.Path; +import java.sql.SQLException; +import java.util.LinkedList; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.commons.lang3.RandomStringUtils; +import org.jooq.JSONFormat; +import org.jooq.JSONFormat.RecordFormat; +import org.jooq.SQLDialect; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DatabricksDestinationAcceptanceTest extends DestinationAcceptanceTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(DatabricksDestinationAcceptanceTest.class); + private static final String SECRETS_CONFIG_JSON = "secrets/config.json"; + private static final JSONFormat JSON_FORMAT = new JSONFormat().recordFormat(RecordFormat.OBJECT); + + private final ExtendedNameTransformer nameTransformer = new DatabricksNameTransformer(); + private JsonNode configJson; + private DatabricksDestinationConfig databricksConfig; + private S3DestinationConfig s3Config; + private AmazonS3 s3Client; + + @Override + protected String getImageName() { + return "airbyte/destination-databricks:dev"; + } + + @Override + protected JsonNode getConfig() { + return configJson; + } + + @Override + protected JsonNode getFailCheckConfig() { + JsonNode failCheckJson = Jsons.clone(configJson); + // set invalid credential + ((ObjectNode) failCheckJson.get("data_source")) + .put("s3_access_key_id", "fake-key") + .put("s3_secret_access_key", "fake-secret"); + return failCheckJson; + } + + @Override + protected List retrieveRecords(TestDestinationEnv testEnv, + String streamName, + String namespace, + JsonNode streamSchema) + throws SQLException { + String tableName = nameTransformer.getIdentifier(streamName); + String schemaName = StreamCopierFactory.getSchema(namespace, databricksConfig.getDatabaseSchema(), nameTransformer); + JsonFieldNameUpdater nameUpdater = AvroRecordHelper.getFieldNameUpdater(streamName, namespace, streamSchema); + + Database database = getDatabase(databricksConfig); + return database.query(ctx -> ctx.select(asterisk()) + .from(String.format("%s.%s", schemaName, tableName)) + .orderBy(field(JavaBaseConstants.COLUMN_NAME_EMITTED_AT).asc()) + .fetch().stream() + .map(record -> { + JsonNode json = Jsons.deserialize(record.formatJSON(JSON_FORMAT)); + JsonNode jsonWithOriginalFields = nameUpdater.getJsonWithOriginalFieldNames(json); + return AvroRecordHelper.pruneAirbyteJson(jsonWithOriginalFields); + }) + .collect(Collectors.toList())); + } + + @Override + protected void setup(TestDestinationEnv testEnv) { + JsonNode baseConfigJson = Jsons.deserialize(IOs.readFile(Path.of(SECRETS_CONFIG_JSON))); + + // Set a random s3 bucket path and database schema for each integration test + String randomString = RandomStringUtils.randomAlphanumeric(5); + JsonNode configJson = Jsons.clone(baseConfigJson); + ((ObjectNode) configJson).put("database_schema", "integration_test_" + randomString); + JsonNode dataSource = configJson.get("data_source"); + ((ObjectNode) dataSource).put("s3_bucket_path", "test_" + randomString); + + this.configJson = configJson; + this.databricksConfig = DatabricksDestinationConfig.get(configJson); + this.s3Config = databricksConfig.getS3DestinationConfig(); + LOGGER.info("Test full path: s3://{}/{}", s3Config.getBucketName(), s3Config.getBucketPath()); + + this.s3Client = s3Config.getS3Client(); + } + + @Override + protected void tearDown(TestDestinationEnv testEnv) throws SQLException { + // clean up s3 + List keysToDelete = new LinkedList<>(); + List objects = s3Client + .listObjects(s3Config.getBucketName(), s3Config.getBucketPath()) + .getObjectSummaries(); + for (S3ObjectSummary object : objects) { + keysToDelete.add(new KeyVersion(object.getKey())); + } + + if (keysToDelete.size() > 0) { + LOGGER.info("Tearing down test bucket path: {}/{}", s3Config.getBucketName(), + s3Config.getBucketPath()); + DeleteObjectsResult result = s3Client + .deleteObjects(new DeleteObjectsRequest(s3Config.getBucketName()).withKeys(keysToDelete)); + LOGGER.info("Deleted {} file(s).", result.getDeletedObjects().size()); + } + + // clean up database + LOGGER.info("Dropping database schema {}", databricksConfig.getDatabaseSchema()); + Database database = getDatabase(databricksConfig); + // we cannot use jooq dropSchemaIfExists method here because there is no proper dialect for + // Databricks, and it incorrectly quotes the schema name + database.query(ctx -> ctx.execute(String.format("DROP SCHEMA IF EXISTS %s CASCADE;", databricksConfig.getDatabaseSchema()))); + } + + private static Database getDatabase(DatabricksDestinationConfig databricksConfig) { + return Databases.createDatabase( + DatabricksConstants.DATABRICKS_USERNAME, + databricksConfig.getDatabricksPersonalAccessToken(), + DatabricksDestination.getDatabricksConnectionString(databricksConfig), + DatabricksConstants.DATABRICKS_DRIVER_CLASS, + SQLDialect.DEFAULT); + } + +} diff --git a/airbyte-integrations/connectors/destination-databricks/src/test/java/io/airbyte/integrations/destination/databricks/DatabricksDestinationConfigTest.java b/airbyte-integrations/connectors/destination-databricks/src/test/java/io/airbyte/integrations/destination/databricks/DatabricksDestinationConfigTest.java new file mode 100644 index 000000000000..7bf3f05e253b --- /dev/null +++ b/airbyte-integrations/connectors/destination-databricks/src/test/java/io/airbyte/integrations/destination/databricks/DatabricksDestinationConfigTest.java @@ -0,0 +1,63 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.databricks; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.junit.jupiter.api.Test; + +class DatabricksDestinationConfigTest { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + @Test + public void testConfigCreationFromJson() { + ObjectNode dataSourceConfig = OBJECT_MAPPER.createObjectNode() + .put("data_source_type", "S3") + .put("s3_bucket_name", "bucket_name") + .put("s3_bucket_path", "bucket_path") + .put("s3_bucket_region", "bucket_region") + .put("s3_access_key_id", "access_key_id") + .put("s3_secret_access_key", "secret_access_key"); + + ObjectNode databricksConfig = OBJECT_MAPPER.createObjectNode() + .put("databricks_server_hostname", "server_hostname") + .put("databricks_http_path", "http_path") + .put("databricks_personal_access_token", "pak") + .set("data_source", dataSourceConfig); + + DatabricksDestinationConfig config1 = DatabricksDestinationConfig.get(databricksConfig); + assertEquals(DatabricksDestinationConfig.DEFAULT_DATABRICKS_PORT, config1.getDatabricksPort()); + assertEquals(DatabricksDestinationConfig.DEFAULT_DATABASE_SCHEMA, config1.getDatabaseSchema()); + + databricksConfig.put("databricks_port", "1000").put("database_schema", "testing_schema"); + DatabricksDestinationConfig config2 = DatabricksDestinationConfig.get(databricksConfig); + assertEquals("1000", config2.getDatabricksPort()); + assertEquals("testing_schema", config2.getDatabaseSchema()); + } + +} diff --git a/airbyte-integrations/connectors/destination-databricks/src/test/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopierTest.java b/airbyte-integrations/connectors/destination-databricks/src/test/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopierTest.java new file mode 100644 index 000000000000..ffbfc398f6ed --- /dev/null +++ b/airbyte-integrations/connectors/destination-databricks/src/test/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopierTest.java @@ -0,0 +1,44 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.databricks; + +import static org.junit.jupiter.api.Assertions.*; + +import io.airbyte.integrations.destination.s3.S3DestinationConfig; +import java.util.UUID; +import org.junit.jupiter.api.Test; + +class DatabricksStreamCopierTest { + + @Test + public void testGetStagingS3DestinationConfig() { + String bucketPath = UUID.randomUUID().toString(); + S3DestinationConfig config = new S3DestinationConfig("", "", bucketPath, "", "", "", null); + String stagingFolder = UUID.randomUUID().toString(); + S3DestinationConfig stagingConfig = DatabricksStreamCopier.getStagingS3DestinationConfig(config, stagingFolder); + assertEquals(String.format("%s/%s", bucketPath, stagingFolder), stagingConfig.getBucketPath()); + } + +} diff --git a/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsAvroDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsAvroDestinationAcceptanceTest.java index 230cd278c91c..fa11d93a79bb 100644 --- a/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsAvroDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsAvroDestinationAcceptanceTest.java @@ -31,6 +31,7 @@ import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.destination.s3.S3Format; import io.airbyte.integrations.destination.s3.avro.JsonFieldNameUpdater; +import io.airbyte.integrations.destination.s3.util.AvroRecordHelper; import java.util.LinkedList; import java.util.List; import org.apache.avro.file.DataFileReader; diff --git a/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsParquetDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsParquetDestinationAcceptanceTest.java index f5ddccd8d44e..ef93d70f26db 100644 --- a/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsParquetDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsParquetDestinationAcceptanceTest.java @@ -32,6 +32,7 @@ import io.airbyte.integrations.destination.gcs.parquet.GcsParquetWriter; import io.airbyte.integrations.destination.s3.S3Format; import io.airbyte.integrations.destination.s3.avro.JsonFieldNameUpdater; +import io.airbyte.integrations.destination.s3.util.AvroRecordHelper; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/CopyConsumerFactory.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/CopyConsumerFactory.java index 9dcb075575bb..a11d341b9144 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/CopyConsumerFactory.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/CopyConsumerFactory.java @@ -24,7 +24,6 @@ package io.airbyte.integrations.destination.jdbc.copy; -import io.airbyte.commons.json.Jsons; import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.integrations.base.AirbyteMessageConsumer; import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; @@ -37,8 +36,6 @@ import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteRecordMessage; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; -import java.sql.Timestamp; -import java.time.Instant; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -94,8 +91,7 @@ private static Map createWrite for (var configuredStream : catalog.getStreams()) { var stream = configuredStream.getStream(); var pair = AirbyteStreamNameNamespacePair.fromAirbyteSteam(stream); - var syncMode = configuredStream.getDestinationSyncMode(); - var copier = streamCopierFactory.create(defaultSchema, config, stagingFolder, syncMode, stream, namingResolver, database, sqlOperations); + var copier = streamCopierFactory.create(defaultSchema, config, stagingFolder, configuredStream, namingResolver, database, sqlOperations); pairToCopier.put(pair, copier); } @@ -116,8 +112,7 @@ private static RecordWriter recordWriterFunction(Map { StreamCopier create(String configuredSchema, T config, String stagingFolder, - DestinationSyncMode syncMode, - AirbyteStream stream, + ConfiguredAirbyteStream configuredStream, ExtendedNameTransformer nameTransformer, JdbcDatabase db, SqlOperations sqlOperations); + static String getSchema(String namespace, String configuredSchema, ExtendedNameTransformer nameTransformer) { + if (namespace != null) { + return nameTransformer.convertStreamName(namespace); + } else { + return nameTransformer.convertStreamName(configuredSchema); + } + } + } diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsStreamCopier.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsStreamCopier.java index c74ee13e8389..69fe6dddff78 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsStreamCopier.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsStreamCopier.java @@ -30,10 +30,12 @@ import com.google.cloud.storage.BlobInfo; import com.google.cloud.storage.Storage; import com.google.cloud.storage.StorageOptions; +import io.airbyte.commons.json.Jsons; import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.integrations.destination.ExtendedNameTransformer; import io.airbyte.integrations.destination.jdbc.SqlOperations; import io.airbyte.integrations.destination.jdbc.copy.StreamCopier; +import io.airbyte.protocol.models.AirbyteRecordMessage; import io.airbyte.protocol.models.DestinationSyncMode; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -44,6 +46,7 @@ import java.nio.charset.StandardCharsets; import java.sql.SQLException; import java.sql.Timestamp; +import java.time.Instant; import java.util.UUID; import org.apache.commons.csv.CSVFormat; import org.apache.commons.csv.CSVPrinter; @@ -103,8 +106,10 @@ public GcsStreamCopier(String stagingFolder, } @Override - public void write(UUID id, String jsonDataString, Timestamp emittedAt) throws Exception { - csvPrinter.printRecord(id, jsonDataString, emittedAt); + public void write(UUID id, AirbyteRecordMessage recordMessage) throws Exception { + csvPrinter.printRecord(id, + Jsons.serialize(recordMessage.getData()), + Timestamp.from(Instant.ofEpochMilli(recordMessage.getEmittedAt()))); } @Override diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsStreamCopierFactory.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsStreamCopierFactory.java index 594eaf85285f..358da1a64497 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsStreamCopierFactory.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsStreamCopierFactory.java @@ -28,12 +28,12 @@ import com.google.cloud.storage.Storage; import com.google.cloud.storage.StorageOptions; import io.airbyte.db.jdbc.JdbcDatabase; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.destination.ExtendedNameTransformer; import io.airbyte.integrations.destination.jdbc.SqlOperations; import io.airbyte.integrations.destination.jdbc.copy.StreamCopier; import io.airbyte.integrations.destination.jdbc.copy.StreamCopierFactory; import io.airbyte.protocol.models.AirbyteStream; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; import io.airbyte.protocol.models.DestinationSyncMode; import java.io.ByteArrayInputStream; import java.io.InputStream; @@ -47,14 +47,14 @@ public abstract class GcsStreamCopierFactory implements StreamCopierFactory { @@ -43,17 +43,17 @@ public abstract class S3StreamCopierFactory implements StreamCopierFactory parquetWriter; private final AvroRecordFactory avroRecordFactory; + private final Schema parquetSchema; + private final String outputFilename; public S3ParquetWriter(S3DestinationConfig config, AmazonS3 s3Client, @@ -66,10 +68,10 @@ public S3ParquetWriter(S3DestinationConfig config, throws URISyntaxException, IOException { super(config, s3Client, configuredStream); - String outputFilename = BaseS3Writer.getOutputFilename(uploadTimestamp, S3Format.PARQUET); + this.outputFilename = BaseS3Writer.getOutputFilename(uploadTimestamp, S3Format.PARQUET); String objectKey = String.join("/", outputPrefix, outputFilename); - LOGGER.info("Full S3 path for stream '{}': {}/{}", stream.getName(), config.getBucketName(), + LOGGER.info("Full S3 path for stream '{}': s3://{}/{}", stream.getName(), config.getBucketName(), objectKey); URI uri = new URI( @@ -88,6 +90,7 @@ public S3ParquetWriter(S3DestinationConfig config, .withDictionaryEncoding(formatConfig.isDictionaryEncoding()) .build(); this.avroRecordFactory = new AvroRecordFactory(schema, nameUpdater); + this.parquetSchema = schema; } public static Configuration getHadoopConfig(S3DestinationConfig config) { @@ -105,6 +108,21 @@ public static Configuration getHadoopConfig(S3DestinationConfig config) { return hadoopConfig; } + public Schema getParquetSchema() { + return parquetSchema; + } + + /** + * The file path includes prefix and filename, but does not include the bucket name. + */ + public String getOutputFilePath() { + return outputPrefix + "/" + outputFilename; + } + + public String getOutputFilename() { + return outputFilename; + } + @Override public void write(UUID id, AirbyteRecordMessage recordMessage) throws IOException { parquetWriter.write(avroRecordFactory.getAvroRecord(id, recordMessage)); diff --git a/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/AvroRecordHelper.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/util/AvroRecordHelper.java similarity index 94% rename from airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/AvroRecordHelper.java rename to airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/util/AvroRecordHelper.java index db7cf31e1d7f..839fcce27dc3 100644 --- a/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/AvroRecordHelper.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/util/AvroRecordHelper.java @@ -22,7 +22,7 @@ * SOFTWARE. */ -package io.airbyte.integrations.destination.gcs; +package io.airbyte.integrations.destination.s3.util; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -31,6 +31,9 @@ import io.airbyte.integrations.destination.s3.avro.JsonFieldNameUpdater; import io.airbyte.integrations.destination.s3.avro.JsonToAvroSchemaConverter; +/** + * Helper methods for unit tests. This is needed by multiple modules, so it is in the src directory. + */ public class AvroRecordHelper { public static JsonFieldNameUpdater getFieldNameUpdater(String streamName, String namespace, JsonNode streamSchema) { diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/writer/BaseS3Writer.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/writer/BaseS3Writer.java index cf2b2aecb5b8..caba07b4ebc1 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/writer/BaseS3Writer.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/writer/BaseS3Writer.java @@ -72,6 +72,10 @@ protected BaseS3Writer(S3DestinationConfig config, this.outputPrefix = S3OutputPathHelper.getOutputPrefix(config.getBucketPath(), stream); } + public String getOutputPrefix() { + return outputPrefix; + } + /** *
  • 1. Create bucket if necessary.
  • *
  • 2. Under OVERWRITE mode, delete all objects with the output prefix.
  • diff --git a/airbyte-integrations/connectors/destination-s3/src/test-integration/java/io/airbyte/integrations/destination/s3/AvroRecordHelper.java b/airbyte-integrations/connectors/destination-s3/src/test-integration/java/io/airbyte/integrations/destination/s3/AvroRecordHelper.java deleted file mode 100644 index 83b3d5134a97..000000000000 --- a/airbyte-integrations/connectors/destination-s3/src/test-integration/java/io/airbyte/integrations/destination/s3/AvroRecordHelper.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 Airbyte - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package io.airbyte.integrations.destination.s3; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ObjectNode; -import io.airbyte.commons.util.MoreIterators; -import io.airbyte.integrations.base.JavaBaseConstants; -import io.airbyte.integrations.destination.s3.avro.JsonFieldNameUpdater; -import io.airbyte.integrations.destination.s3.avro.JsonToAvroSchemaConverter; - -public class AvroRecordHelper { - - public static JsonFieldNameUpdater getFieldNameUpdater(String streamName, String namespace, JsonNode streamSchema) { - JsonToAvroSchemaConverter schemaConverter = new JsonToAvroSchemaConverter(); - schemaConverter.getAvroSchema(streamSchema, streamName, namespace, true); - return new JsonFieldNameUpdater(schemaConverter.getStandardizedNames()); - } - - /** - * Convert an Airbyte JsonNode from Avro / Parquet Record to a plain one. - *
  • Remove the airbyte id and emission timestamp fields.
  • - *
  • Remove null fields that must exist in Parquet but does not in original Json.
  • This - * function mutates the input Json. - */ - public static JsonNode pruneAirbyteJson(JsonNode input) { - ObjectNode output = (ObjectNode) input; - - // Remove Airbyte columns. - output.remove(JavaBaseConstants.COLUMN_NAME_AB_ID); - output.remove(JavaBaseConstants.COLUMN_NAME_EMITTED_AT); - - // Fields with null values does not exist in the original Json but only in Parquet. - for (String field : MoreIterators.toList(output.fieldNames())) { - if (output.get(field) == null || output.get(field).isNull()) { - output.remove(field); - } - } - - return output; - } - -} diff --git a/airbyte-integrations/connectors/destination-s3/src/test-integration/java/io/airbyte/integrations/destination/s3/S3AvroDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-s3/src/test-integration/java/io/airbyte/integrations/destination/s3/S3AvroDestinationAcceptanceTest.java index db7ca2343784..9d12d8cbd5cc 100644 --- a/airbyte-integrations/connectors/destination-s3/src/test-integration/java/io/airbyte/integrations/destination/s3/S3AvroDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-s3/src/test-integration/java/io/airbyte/integrations/destination/s3/S3AvroDestinationAcceptanceTest.java @@ -30,6 +30,7 @@ import com.fasterxml.jackson.databind.ObjectReader; import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.destination.s3.avro.JsonFieldNameUpdater; +import io.airbyte.integrations.destination.s3.util.AvroRecordHelper; import java.util.LinkedList; import java.util.List; import org.apache.avro.file.DataFileReader; diff --git a/airbyte-integrations/connectors/destination-s3/src/test-integration/java/io/airbyte/integrations/destination/s3/S3DestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-s3/src/test-integration/java/io/airbyte/integrations/destination/s3/S3DestinationAcceptanceTest.java index 12eb55522f06..93a44b41e5e7 100644 --- a/airbyte-integrations/connectors/destination-s3/src/test-integration/java/io/airbyte/integrations/destination/s3/S3DestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-s3/src/test-integration/java/io/airbyte/integrations/destination/s3/S3DestinationAcceptanceTest.java @@ -24,13 +24,7 @@ package io.airbyte.integrations.destination.s3; -import com.amazonaws.ClientConfiguration; -import com.amazonaws.auth.AWSCredentials; -import com.amazonaws.auth.AWSStaticCredentialsProvider; -import com.amazonaws.auth.BasicAWSCredentials; -import com.amazonaws.client.builder.AwsClientBuilder; import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.AmazonS3ClientBuilder; import com.amazonaws.services.s3.model.DeleteObjectsRequest; import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion; import com.amazonaws.services.s3.model.DeleteObjectsResult; @@ -143,27 +137,7 @@ protected void setup(TestDestinationEnv testEnv) { this.config = S3DestinationConfig.getS3DestinationConfig(configJson); LOGGER.info("Test full path: {}/{}", config.getBucketName(), config.getBucketPath()); - AWSCredentials awsCreds = new BasicAWSCredentials(config.getAccessKeyId(), - config.getSecretAccessKey()); - String endpoint = config.getEndpoint(); - - if (endpoint.isEmpty()) { - this.s3Client = AmazonS3ClientBuilder.standard() - .withCredentials(new AWSStaticCredentialsProvider(awsCreds)) - .withRegion(config.getBucketRegion()) - .build(); - } else { - ClientConfiguration clientConfiguration = new ClientConfiguration(); - clientConfiguration.setSignerOverride("AWSS3V4SignerType"); - - this.s3Client = AmazonS3ClientBuilder - .standard() - .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpoint, config.getBucketRegion())) - .withPathStyleAccessEnabled(true) - .withClientConfiguration(clientConfiguration) - .withCredentials(new AWSStaticCredentialsProvider(awsCreds)) - .build(); - } + this.s3Client = config.getS3Client(); } /** diff --git a/airbyte-integrations/connectors/destination-s3/src/test-integration/java/io/airbyte/integrations/destination/s3/S3ParquetDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-s3/src/test-integration/java/io/airbyte/integrations/destination/s3/S3ParquetDestinationAcceptanceTest.java index 6166a8869bd6..07b0bba1ef82 100644 --- a/airbyte-integrations/connectors/destination-s3/src/test-integration/java/io/airbyte/integrations/destination/s3/S3ParquetDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-s3/src/test-integration/java/io/airbyte/integrations/destination/s3/S3ParquetDestinationAcceptanceTest.java @@ -31,6 +31,7 @@ import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.destination.s3.avro.JsonFieldNameUpdater; import io.airbyte.integrations.destination.s3.parquet.S3ParquetWriter; +import io.airbyte.integrations.destination.s3.util.AvroRecordHelper; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; diff --git a/docs/SUMMARY.md b/docs/SUMMARY.md index 6d76f82134ab..f190bf488a7d 100644 --- a/docs/SUMMARY.md +++ b/docs/SUMMARY.md @@ -132,6 +132,7 @@ * [Destinations](integrations/destinations/README.md) * [AzureBlobStorage](integrations/destinations/azureblobstorage.md) * [BigQuery](integrations/destinations/bigquery.md) + * [Databricks](integrations/destinations/databricks.md) * [DynamoDB](integrations/destinations/dynamodb.md) * [Chargify](integrations/destinations/keen.md) * [Google Cloud Storage (GCS)](integrations/destinations/gcs.md) diff --git a/docs/integrations/README.md b/docs/integrations/README.md index 4781c3f0bcb4..9962600065c6 100644 --- a/docs/integrations/README.md +++ b/docs/integrations/README.md @@ -116,6 +116,7 @@ Airbyte uses a grading system for connectors to help users understand what to ex |[AzureBlobStorage](./destinations/azureblobstorage.md)| Alpha | |[BigQuery](./destinations/bigquery.md)| Certified | |[Chargify (Keen)](./destinations/keen.md)| Alpha | +|[Databricks](./destinations/databricks.md) | Beta | |[Google Cloud Storage (GCS)](./destinations/gcs.md)| Alpha | |[Google Pubsub](./destinations/pubsub.md)| Alpha | |[Kafka](./destinations/kafka.md)| Alpha | diff --git a/docs/integrations/destinations/databricks.md b/docs/integrations/destinations/databricks.md new file mode 100644 index 000000000000..c1678d279426 --- /dev/null +++ b/docs/integrations/destinations/databricks.md @@ -0,0 +1,105 @@ +# Databricks + +## Overview + +This destination syncs data to Databricks cluster. Each stream is written to its own table. + +This connector requires a JDBC driver to connect to Databricks cluster. The driver is developed by Simba. Before using the driver and the connector, you must agree to the [JDBC ODBC driver license](https://databricks.com/jdbc-odbc-driver-license). This means that you can only use this connector to connector third party applications to Apache Spark SQL within a Databricks offering using the ODBC and/or JDBC protocols. + +Due to legal reasons, this is currently a private connector that is only available in Airbyte Cloud. We are working on publicizing it. Please follow [this issue](https://github.com/airbytehq/airbyte/issues/6043) for progress. + +## Sync Mode + +| Feature | Support | Notes | +| :--- | :---: | :--- | +| Full Refresh Sync | ✅ | Warning: this mode deletes all previously synced data in the configured bucket path. | +| Incremental - Append Sync | ✅ | | +| Incremental - Dedupe Sync | ❌ | | +| Namespaces | ✅ | | + +## Configuration + +Databricks parameters + +| Category | Parameter | Type | Notes | +| :--- | :--- | :---: | :--- | +| Databricks | Server Hostname | string | Required. See [documentation](https://docs.databricks.com/integrations/bi/jdbc-odbc-bi.html#get-server-hostname-port-http-path-and-jdbc-url). | +| | HTTP Path | string | Required. See [documentation](https://docs.databricks.com/integrations/bi/jdbc-odbc-bi.html#get-server-hostname-port-http-path-and-jdbc-url). | +| | Port | string | Optional. Default to "443". See [documentation](https://docs.databricks.com/integrations/bi/jdbc-odbc-bi.html#get-server-hostname-port-http-path-and-jdbc-url). | +| | Personal Access Token | string | Required. See [documentation](https://docs.databricks.com/sql/user/security/personal-access-tokens.html). | +| General | Database schema | string | Optional. Default to "public". Each data stream will be written to a table under this database schema. | +| | Purge Staging Files and Tables | The connector creates staging files and tables on S3. By default they will be purged when the data sync is complete. Set it to `false` for debugging purpose. | +| S3 | Bucket Name | string | Name of the bucket to sync data into. | +| | Bucket Path | string | Subdirectory under the above bucket to sync the data into. | +| | Region | string | See [documentation](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html#concepts-available-regions) for all region codes. | +| | Access Key ID | string | AWS/Minio credential. | +| | Secret Access Key | string | AWS/Minio credential. | + +⚠️ Please note that under "Full Refresh Sync" mode, data in the configured bucket and path will be wiped out before each sync. We recommend you to provision a dedicated S3 resource for this sync to prevent unexpected data deletion from misconfiguration. ⚠️ + +## Staging Parquet Files + +Data streams are first written as staging Parquet files on S3, and then loaded into Databricks tables. All the staging files will be deleted after the sync is done. For debugging purposes, here is the full path for a staging file: + +``` +s3:///// +``` + +For example: + +``` +s3://testing_bucket/data_output_path/98c450be-5b1c-422d-b8b5-6ca9903727d9/users + ↑ ↑ ↑ ↑ + | | | stream name + | | database schema + | bucket path + bucket name +``` + + +## Unmanaged Spark SQL Table + +Currently, all streams are synced into unmanaged Spark SQL tables. See [documentation](https://docs.databricks.com/data/tables.html#managed-and-unmanaged-tables) for details. In summary, you have full control of the location of the data underlying an unmanaged table. The full path of each data stream is: + +``` +s3:///// +``` + +For example: + +``` +s3://testing_bucket/data_output_path/public/users + ↑ ↑ ↑ ↑ + | | | stream name + | | database schema + | bucket path + bucket name +``` + +Please keep these data directories on S3. Otherwise, the corresponding tables will have no data in Databricks. + +## Output Schema + +Each table will have the following columns: + +| Column | Type | Notes | +| :--- | :---: | :--- | +| `_airbyte_ab_id` | string | UUID. | +| `_airbyte_emitted_at` | timestamp | Data emission timestamp. | +| Data fields from the source stream | various | All fields in the staging Parquet files will be expanded in the table. | + +Learn how source data is converted to Parquet and the current limitations [here](https://docs.airbyte.io/integrations/destinations/s3#data-schema). + +## Getting started + +### Requirements + +1. Credentials for a Databricks cluster. See [documentation](https://docs.databricks.com/clusters/create.html). +2. Credentials for an S3 bucket. See [documentation](https://docs.aws.amazon.com/general/latest/gr/aws-sec-cred-types.html#access-keys-and-secret-access-keys). +3. Grant the Databricks cluster full access to the S3 bucket. Or mount it as Databricks File System (DBFS). See [documentation](https://docs.databricks.com/data/data-sources/aws/amazon-s3.html). + +## CHANGELOG + +| Version | Date | Pull Request | Subject | +| :--- | :--- | :--- | :--- | +| 0.1.0 | 2021-09-14 | [#5998](https://github.com/airbytehq/airbyte/pull/5998) | Initial private release. | diff --git a/docs/integrations/destinations/s3.md b/docs/integrations/destinations/s3.md index 0db07a2e9877..af7e232fe9b0 100644 --- a/docs/integrations/destinations/s3.md +++ b/docs/integrations/destinations/s3.md @@ -100,7 +100,7 @@ Here is the available compression codecs: Under the hood, an Airbyte data stream in Json schema is converted to an Avro schema, and then the Json object is converted to an Avro record based on the Avro schema. Because the data stream can come from any data source, the Avro S3 destination connector has the following arbitrary rules. -1. Json schema types are mapped to Avro typea as follows: +1. Json schema types are mapped to Avro types as follows: | Json Data Type | Avro Data Type | | :---: | :---: | @@ -350,7 +350,7 @@ Under the hood, an Airbyte data stream in Json schema is first converted to an A ### Requirements 1. Allow connections from Airbyte server to your AWS S3/ Minio S3 cluster \(if they exist in separate VPCs\). -2. An S3 bucket with credentials \(for the COPY strategy\). +2. An S3 bucket with credentials. ### Setup guide diff --git a/tools/bin/ci_credentials.sh b/tools/bin/ci_credentials.sh index 3a73deff0fc9..59c691dea555 100755 --- a/tools/bin/ci_credentials.sh +++ b/tools/bin/ci_credentials.sh @@ -24,6 +24,7 @@ function write_standard_creds() { # Please maintain this organisation and alphabetise. write_standard_creds destination-bigquery "$BIGQUERY_INTEGRATION_TEST_CREDS" "credentials.json" write_standard_creds destination-bigquery-denormalized "$BIGQUERY_DENORMALIZED_INTEGRATION_TEST_CREDS" "credentials.json" +write_standard_creds destination-databricks "$DESTINATION_DATABRICKS_CREDS" write_standard_creds destination-gcs "$DESTINATION_GCS_CREDS" write_standard_creds destination-kvdb "$DESTINATION_KVDB_TEST_CREDS" write_standard_creds destination-keen "$DESTINATION_KEEN_TEST_CREDS"