Skip to content

Commit

Permalink
🎉 Destination Databricks: Support Azure storage (#15140) (#15329)
Browse files Browse the repository at this point in the history
* 🎉 Destination Databricks: Support Azure storage (#15140)

* Update docs and version.

* Revert unintentional change to S3 config parsing.

* Revert unnecessary additional table property.

* change spec.json

* Ignore azure integration test

* Add issue link

* auto-bump connector version

Co-authored-by: Marcos Marx <[email protected]>
Co-authored-by: marcosmarxm <[email protected]>
Co-authored-by: Liren Tu <[email protected]>
Co-authored-by: Octavia Squidington III <[email protected]>
  • Loading branch information
5 people authored Oct 15, 2022
1 parent a73ae2b commit 1b134d8
Show file tree
Hide file tree
Showing 37 changed files with 1,393 additions and 352 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@
- name: Databricks Lakehouse
destinationDefinitionId: 072d5540-f236-4294-ba7c-ade8fd918496
dockerRepository: airbyte/destination-databricks
dockerImageTag: 0.2.6
dockerImageTag: 0.3.0
documentationUrl: https://docs.airbyte.com/integrations/destinations/databricks
icon: databricks.svg
releaseStage: alpha
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1239,7 +1239,7 @@
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-databricks:0.2.6"
- dockerImage: "airbyte/destination-databricks:0.3.0"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/destinations/databricks"
connectionSpecification:
Expand Down Expand Up @@ -1401,6 +1401,52 @@
- "{part_number}"
- "{sync_id}"
order: 7
- title: "Azure Blob Storage"
required:
- "data_source_type"
- "azure_blob_storage_account_name"
- "azure_blob_storage_container_name"
- "azure_blob_storage_sas_token"
properties:
data_source_type:
type: "string"
enum:
- "Azure_Blob_Storage"
default: "Azure_Blob_Storage"
order: 0
azure_blob_storage_endpoint_domain_name:
title: "Endpoint Domain Name"
type: "string"
default: "blob.core.windows.net"
description: "This is Azure Blob Storage endpoint domain name. Leave\
\ default value (or leave it empty if run container from command\
\ line) to use Microsoft native from example."
examples:
- "blob.core.windows.net"
order: 1
azure_blob_storage_account_name:
title: "Azure Blob Storage Account Name"
type: "string"
description: "The account's name of the Azure Blob Storage."
examples:
- "airbyte5storage"
order: 2
azure_blob_storage_container_name:
title: "Azure Blob Storage Container Name"
type: "string"
description: "The name of the Azure blob storage container."
examples:
- "airbytetestcontainername"
order: 3
azure_blob_storage_sas_token:
title: "SAS Token"
type: "string"
airbyte_secret: true
description: "Shared access signature (SAS) token to grant limited\
\ access to objects in your storage account."
examples:
- "?sv=2016-05-31&ss=b&srt=sco&sp=rwdl&se=2018-06-27T10:05:50Z&st=2017-06-27T02:05:50Z&spr=https,http&sig=bgqQwoXwxzuD2GJfagRg7VOS8hzNr3QLT7rhS8OFRLQ%3D"
order: 4
order: 7
purge_staging_data:
title: "Purge Staging Files and Tables"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.azure.storage.blob.specialized.AppendBlobClient;
import com.azure.storage.blob.specialized.SpecializedBlobClientBuilder;
import com.azure.storage.common.StorageSharedKeyCredential;
import io.airbyte.integrations.destination.jdbc.copy.azure.AzureBlobStorageConfig;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -42,6 +43,16 @@ public AzureBlobStorageConnectionChecker(
.buildAppendBlobClient();
}

public AzureBlobStorageConnectionChecker(final AzureBlobStorageConfig azureBlobStorageConfig) {
this.appendBlobClient =
new SpecializedBlobClientBuilder()
.endpoint(azureBlobStorageConfig.getEndpointUrl())
.sasToken(azureBlobStorageConfig.getSasToken())
.containerName(azureBlobStorageConfig.getContainerName()) // Like schema in DB
.blobName(TEST_BLOB_NAME_PREFIX + UUID.randomUUID()) // Like table in DB
.buildAppendBlobClient();
}

/*
* This a kinda test method that is used in CHECK operation to make sure all works fine with the
* current config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

This destination syncs data to Delta Lake on Databricks Lakehouse. It does so in two steps:

1. Persist source data in S3 staging files in the Parquet format.
2. Create delta table based on the Parquet staging files.
1. Persist source data in S3 staging files in the Parquet format, or in Azure blob storage staging files in the CSV format.
2. Create delta table based on the staging files.

See [this](https://docs.airbyte.io/integrations/destinations/databricks) link for the nuances about the connector.
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-databricks

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.2.6
LABEL io.airbyte.version=0.3.0
LABEL io.airbyte.name=airbyte/destination-databricks
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@ From the Airbyte repository root, run:
```

#### Create credentials
**If you are a community contributor**, you will need access to AWS S3 and Databricks cluster to run the integration tests:
**If you are a community contributor**, you will need access to AWS S3, Azure blob storage, and Databricks cluster to run the integration tests:

- Create a Databricks cluster. See [documentation](https://docs.databricks.com/clusters/create.html).
- Create an S3 bucket. See [documentation](https://docs.aws.amazon.com/general/latest/gr/aws-sec-cred-types.html#access-keys-and-secret-access-keys).
- Grant the Databricks cluster full access to the S3 bucket. Or mount it as Databricks File System (DBFS). See [documentation](https://docs.databricks.com/data/data-sources/aws/amazon-s3.html).
- Create an Azure storage container.
- Grant the Databricks cluster full access to the S3 bucket and Azure container. Or mount it as Databricks File System (DBFS). See [documentation](https://docs.databricks.com/data/data-sources/aws/amazon-s3.html).
- Place both Databricks and S3 credentials in `sample_secrets/config.json`, which conforms to the spec file in `src/main/resources/spec.json`.
- Place both Databricks and Azure credentials in `sample_secrets/azure_config.json`, which conforms to the spec file in `src/main/resources/spec.json`.
- Rename the directory from `sample_secrets` to `secrets`.
- Note that the `secrets` directory is git-ignored by default, so there is no danger of accidentally checking in sensitive information.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ dependencies {
implementation project(':airbyte-integrations:bases:base-java-s3')
implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)
implementation project(':airbyte-integrations:connectors:destination-jdbc')
implementation project(':airbyte-integrations:connectors:destination-s3')
implementation project(':airbyte-integrations:connectors:destination-azure-blob-storage')
implementation group: 'com.databricks', name: 'databricks-jdbc', version: '2.6.25'

// parquet
Expand All @@ -46,6 +48,8 @@ dependencies {
}
implementation ('org.apache.parquet:parquet-avro:1.12.0') { exclude group: 'org.slf4j', module: 'slf4j-log4j12'}
implementation ('com.github.airbytehq:json-avro-converter:1.0.1') { exclude group: 'ch.qos.logback', module: 'logback-classic'}

implementation 'com.azure:azure-storage-blob:12.18.0'

integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test')
integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-databricks')
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"accept_terms": true,
"databricks_server_hostname": "abc-12345678-wxyz.cloud.databricks.com",
"databricks_http_path": "sql/protocolvx/o/1234567489/0000-1111111-abcd90",
"databricks_port": "443",
"databricks_personal_access_token": "dapi0123456789abcdefghij0123456789AB",
"database_schema": "public",
"data_source": {
"data_source_type": "Azure_Blob_Storage",
"azure_blob_storage_endpoint_domain_name": "blob.core.windows.net",
"azure_blob_storage_account_name": "account",
"azure_blob_storage_sas_token": "token",
"azure_blob_storage_container_name": "container",
"azure_blob_storage_output_buffer_size": 5
},
"purge_staging_data": false
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.databricks;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.integrations.destination.jdbc.copy.azure.AzureBlobStorageConfig;

public class DatabricksAzureBlobStorageConfig extends DatabricksStorageConfig {

private final AzureBlobStorageConfig azureConfig;

public DatabricksAzureBlobStorageConfig(JsonNode config) {
this.azureConfig = AzureBlobStorageConfig.getAzureBlobConfig(config);
}

@Override
public AzureBlobStorageConfig getAzureBlobStorageConfigOrThrow() {
return azureConfig;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.databricks;

import io.airbyte.integrations.destination.azure_blob_storage.AzureBlobStorageConnectionChecker;
import io.airbyte.integrations.destination.jdbc.copy.azure.AzureBlobStorageConfig;

public class DatabricksAzureBlobStorageDestination extends DatabricksBaseDestination {

@Override
protected void checkPersistence(DatabricksStorageConfig databricksConfig) {
AzureBlobStorageConfig azureConfig = databricksConfig.getAzureBlobStorageConfigOrThrow();
final AzureBlobStorageConnectionChecker client = new AzureBlobStorageConnectionChecker(azureConfig);
client.attemptWriteAndDelete();
}

@Override
protected DatabricksStreamCopierFactory getStreamCopierFactory() {
return new DatabricksAzureBlobStorageStreamCopierFactory();
}

}
Loading

0 comments on commit 1b134d8

Please sign in to comment.