Skip to content

Commit

Permalink
🎉 New Destination: Google Cloud Storage (#4784)
Browse files Browse the repository at this point in the history
* Adding Google Cloud Storage as destination

* Removed few comments and amended the version

* Added documentation in docs/integrations/destinations/gcs.md

* Amended gcs.md with the right pull id

* Implemented all the fixes requested by tuliren as per #4329

* Renaming all the files

* Branch alligned to S3 0.1.7 (with Avro and Jsonl). Removed redundant file by making S3 a dependency for GCS

* Removed some additional duplicates between GCS and S3

* Revert changes in the root files

* Revert jdbc files

* Fix package names

* Refactor gcs config

* Format code

* Fix gcs connection

* Format code

* Add acceptance tests

* Fix parquet acceptance test

* Add ci credentials

* Register the connector and update documentations

* Fix typo

* Format code

* Add unit test

* Add comments

* Update readme

Co-authored-by: Sherif A. Nada <[email protected]>

Co-authored-by: Marco Fontana <[email protected]>
Co-authored-by: [email protected] <[email protected]>
Co-authored-by: Marco Fontana <[email protected]>
Co-authored-by: Sherif A. Nada <[email protected]>
  • Loading branch information
5 people authored and gl-pix committed Jul 22, 2021
1 parent e2d4b4f commit 9d7ab35
Show file tree
Hide file tree
Showing 40 changed files with 2,787 additions and 5 deletions.
1 change: 1 addition & 0 deletions .github/workflows/publish-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ jobs:
ZOOM_INTEGRATION_TEST_CREDS: ${{ secrets.ZOOM_INTEGRATION_TEST_CREDS }}
PLAID_INTEGRATION_TEST_CREDS: ${{ secrets.PLAID_INTEGRATION_TEST_CREDS }}
DESTINATION_S3_INTEGRATION_TEST_CREDS: ${{ secrets.DESTINATION_S3_INTEGRATION_TEST_CREDS }}
DESTINATION_GCS_CREDS: ${{ secrets.DESTINATION_GCS_CREDS }}
- run: |
docker login -u airbytebot -p ${DOCKER_PASSWORD}
./tools/integrations/manage.sh publish airbyte-integrations/${{ github.event.inputs.connector }} ${{ github.event.inputs.run-tests }}
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/test-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ jobs:
ZOOM_INTEGRATION_TEST_CREDS: ${{ secrets.ZOOM_INTEGRATION_TEST_CREDS }}
PLAID_INTEGRATION_TEST_CREDS: ${{ secrets.PLAID_INTEGRATION_TEST_CREDS }}
DESTINATION_S3_INTEGRATION_TEST_CREDS: ${{ secrets.DESTINATION_S3_INTEGRATION_TEST_CREDS }}
DESTINATION_GCS_CREDS: ${{ secrets.DESTINATION_GCS_CREDS }}
- run: |
./tools/bin/ci_integration_test.sh ${{ github.event.inputs.connector }}
name: test ${{ github.event.inputs.connector }}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"destinationDefinitionId": "ca8f6566-e555-4b40-943a-545bf123117a",
"name": "Google Cloud Storage (GCS)",
"dockerRepository": "airbyte/destination-gcs",
"dockerImageTag": "0.1.0",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/gcs"
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@
dockerRepository: airbyte/destination-bigquery-denormalized
dockerImageTag: 0.1.0
documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery
- destinationDefinitionId: ca8f6566-e555-4b40-943a-545bf123117a
name: Google Cloud Storage (GCS)
dockerRepository: airbyte/destination-gcs
dockerImageTag: 0.1.0
documentationUrl: https://docs.airbyte.io/integrations/destinations/gcs
- destinationDefinitionId: 356668e2-7e34-47f3-a3b0-67a8a481b692
name: Google PubSub
dockerRepository: airbyte/destination-pubsub
Expand Down
6 changes: 4 additions & 2 deletions airbyte-integrations/builds.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,14 +129,16 @@
# Destinations
BigQuery [![destination-bigquery](https://img.shields.io/endpoint?url=https%3A%2F%2Fstatus-api.airbyte.io%2Ftests%2Fsummary%2Fdestination-bigquery%2Fbadge.json)](https://status-api.airbyte.io/tests/summary/destination-bigquery)

Google Cloud Storage (GCS) [![destination-gcs](https://img.shields.io/endpoint?url=https%3A%2F%2Fstatus-api.airbyte.io%2Ftests%2Fsummary%2Fdestination-s3%2Fbadge.json)](https://status-api.airbyte.io/tests/summary/destination-gcs)

Google PubSub [![destination-pubsub](https://img.shields.io/endpoint?url=https%3A%2F%2Fstatus-api.airbyte.io%2Ftests%2Fsummary%2Fdestination-pubsub%2Fbadge.json)](https://status-api.airbyte.io/tests/summary/destination-pubsub)

Local CSV [![destination-csv](https://img.shields.io/endpoint?url=https%3A%2F%2Fstatus-api.airbyte.io%2Ftests%2Fsummary%2Fdestination-csv%2Fbadge.json)](https://status-api.airbyte.io/tests/summary/destination-csv)

Local JSON [![destination-local-json](https://img.shields.io/endpoint?url=https%3A%2F%2Fstatus-api.airbyte.io%2Ftests%2Fsummary%2Fdestination-local-json%2Fbadge.json)](https://status-api.airbyte.io/tests/summary/destination-local-json)

Postgres [![destination-postgres](https://img.shields.io/endpoint?url=https%3A%2F%2Fstatus-api.airbyte.io%2Ftests%2Fsummary%2Fdestination-postgres%2Fbadge.json)](https://status-api.airbyte.io/tests/summary/destination-postgres)

Google PubSub [![destination-pubsub](https://img.shields.io/endpoint?url=https%3A%2F%2Fstatus-api.airbyte.io%2Ftests%2Fsummary%2Fdestination-pubsub%2Fbadge.json)](https://status-api.airbyte.io/tests/summary/destination-pubsub)

Redshift [![destination-redshift](https://img.shields.io/endpoint?url=https%3A%2F%2Fstatus-api.airbyte.io%2Ftests%2Fsummary%2Fdestination-redshift%2Fbadge.json)](https://status-api.airbyte.io/tests/summary/destination-redshift)

S3 [![destination-s3](https://img.shields.io/endpoint?url=https%3A%2F%2Fstatus-api.airbyte.io%2Ftests%2Fsummary%2Fdestination-s3%2Fbadge.json)](https://status-api.airbyte.io/tests/summary/destination-s3)
Expand Down
3 changes: 3 additions & 0 deletions airbyte-integrations/connectors/destination-gcs/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
*
!Dockerfile
!build
11 changes: 11 additions & 0 deletions airbyte-integrations/connectors/destination-gcs/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
FROM airbyte/integration-base-java:dev

WORKDIR /airbyte
ENV APPLICATION destination-gcs

COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.0
LABEL io.airbyte.name=airbyte/destination-gcs
26 changes: 26 additions & 0 deletions airbyte-integrations/connectors/destination-gcs/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Destination Google Cloud Storage (GCS)

In order to test the D3 destination, you need an Google Cloud Platform account.

## Community Contributor

As a community contributor, you can follow these steps to run integration tests.

- Create an GCS bucket for testing.
- Generate a [HMAC key](https://cloud.google.com/storage/docs/authentication/hmackeys) for the bucket with reading and writing permissions. Please note that currently only the HMAC key credential is supported. More credential types will be added in the future.
- Paste the bucket and key information into the config files under [`./sample_secrets`](./sample_secrets).
- Rename the directory from `sample_secrets` to `secrets`.
- Feel free to modify the config files with different settings in the acceptance test file (e.g. `GcsCsvDestinationAcceptanceTest.java`, method `getFormatConfig`), as long as they follow the schema defined in [spec.json](src/main/resources/spec.json).

## Airbyte Employee

- Access the `destination gcs creds` secrets on Last Pass, and put it in `sample_secrets/config.json`.
- Rename the directory from `sample_secrets` to `secrets`.

## Add New Output Format
- Add a new enum in `S3Format`.
- Modify `spec.json` to specify the configuration of this new format.
- Update `S3FormatConfigs` to be able to construct a config for this new format.
- Create a new package under `io.airbyte.integrations.destination.gcs`.
- Implement a new `GcsWriter`. The implementation can extend `BaseGcsWriter`.
- Write an acceptance test for the new output format. The test can extend `GcsDestinationAcceptanceTest`.
38 changes: 38 additions & 0 deletions airbyte-integrations/connectors/destination-gcs/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
plugins {
id 'application'
id 'airbyte-docker'
id 'airbyte-integration-test-java'
}

application {
mainClass = 'io.airbyte.integrations.destination.gcs.GcsDestination'
}

dependencies {
implementation project(':airbyte-config:models')
implementation project(':airbyte-protocol:models')
implementation project(':airbyte-integrations:bases:base-java')
implementation project(':airbyte-integrations:connectors:destination-jdbc')
implementation project(':airbyte-integrations:connectors:destination-s3')
implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)

implementation platform('com.amazonaws:aws-java-sdk-bom:1.12.14')
implementation 'com.google.cloud.bigdataoss:gcs-connector:hadoop3-2.2.1'

// csv
implementation 'com.amazonaws:aws-java-sdk-s3:1.11.978'
implementation 'org.apache.commons:commons-csv:1.4'
implementation 'com.github.alexmojaki:s3-stream-upload:2.2.2'

// parquet
implementation group: 'org.apache.hadoop', name: 'hadoop-common', version: '3.3.0'
implementation group: 'org.apache.hadoop', name: 'hadoop-aws', version: '3.3.0'
implementation group: 'org.apache.hadoop', name: 'hadoop-mapreduce-client-core', version: '3.3.0'
implementation group: 'org.apache.parquet', name: 'parquet-avro', version: '1.12.0'
implementation group: 'tech.allegro.schema.json2avro', name: 'converter', version: '0.2.10'

testImplementation 'org.apache.commons:commons-lang3:3.11'

integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test')
integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-gcs')
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"gcs_bucket_name": "<bucket-name>",
"gcs_bucket_path": "integration-test",
"gcs_bucket_region": "<region>",
"credential": {
"credential_type": "HMAC_KEY",
"hmac_key_access_id": "<access-id>",
"hmac_key_secret": "<secret>"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.integrations.destination.gcs;

import com.amazonaws.services.s3.AmazonS3;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer;
import io.airbyte.integrations.destination.gcs.writer.GcsWriterFactory;
import io.airbyte.integrations.destination.s3.writer.S3Writer;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import java.sql.Timestamp;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.function.Consumer;

public class GcsConsumer extends FailureTrackingAirbyteMessageConsumer {

private final GcsDestinationConfig gcsDestinationConfig;
private final ConfiguredAirbyteCatalog configuredCatalog;
private final GcsWriterFactory writerFactory;
private final Consumer<AirbyteMessage> outputRecordCollector;
private final Map<AirbyteStreamNameNamespacePair, S3Writer> streamNameAndNamespaceToWriters;

private AirbyteMessage lastStateMessage = null;

public GcsConsumer(GcsDestinationConfig gcsDestinationConfig,
ConfiguredAirbyteCatalog configuredCatalog,
GcsWriterFactory writerFactory,
Consumer<AirbyteMessage> outputRecordCollector) {
this.gcsDestinationConfig = gcsDestinationConfig;
this.configuredCatalog = configuredCatalog;
this.writerFactory = writerFactory;
this.outputRecordCollector = outputRecordCollector;
this.streamNameAndNamespaceToWriters = new HashMap<>(configuredCatalog.getStreams().size());
}

@Override
protected void startTracked() throws Exception {
AmazonS3 s3Client = GcsS3Helper.getGcsS3Client(gcsDestinationConfig);

Timestamp uploadTimestamp = new Timestamp(System.currentTimeMillis());

for (ConfiguredAirbyteStream configuredStream : configuredCatalog.getStreams()) {
S3Writer writer = writerFactory
.create(gcsDestinationConfig, s3Client, configuredStream, uploadTimestamp);
writer.initialize();

AirbyteStream stream = configuredStream.getStream();
AirbyteStreamNameNamespacePair streamNamePair = AirbyteStreamNameNamespacePair
.fromAirbyteSteam(stream);
streamNameAndNamespaceToWriters.put(streamNamePair, writer);
}
}

@Override
protected void acceptTracked(AirbyteMessage airbyteMessage) throws Exception {
if (airbyteMessage.getType() == Type.STATE) {
this.lastStateMessage = airbyteMessage;
return;
} else if (airbyteMessage.getType() != Type.RECORD) {
return;
}

AirbyteRecordMessage recordMessage = airbyteMessage.getRecord();
AirbyteStreamNameNamespacePair pair = AirbyteStreamNameNamespacePair
.fromRecordMessage(recordMessage);

if (!streamNameAndNamespaceToWriters.containsKey(pair)) {
throw new IllegalArgumentException(
String.format(
"Message contained record from a stream that was not in the catalog. \ncatalog: %s , \nmessage: %s",
Jsons.serialize(configuredCatalog), Jsons.serialize(recordMessage)));
}

UUID id = UUID.randomUUID();
streamNameAndNamespaceToWriters.get(pair).write(id, recordMessage);
}

@Override
protected void close(boolean hasFailed) throws Exception {
for (S3Writer handler : streamNameAndNamespaceToWriters.values()) {
handler.close(hasFailed);
}
// Gcs stream uploader is all or nothing if a failure happens in the destination.
if (!hasFailed) {
outputRecordCollector.accept(lastStateMessage);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.integrations.destination.gcs;

import com.amazonaws.services.s3.AmazonS3;
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.integrations.BaseConnector;
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.destination.gcs.writer.GcsWriterFactory;
import io.airbyte.integrations.destination.gcs.writer.ProductionWriterFactory;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GcsDestination extends BaseConnector implements Destination {

private static final Logger LOGGER = LoggerFactory.getLogger(GcsDestination.class);

public static void main(String[] args) throws Exception {
new IntegrationRunner(new GcsDestination()).run(args);
}

@Override
public AirbyteConnectionStatus check(JsonNode config) {
try {
GcsDestinationConfig destinationConfig = GcsDestinationConfig.getGcsDestinationConfig(config);
AmazonS3 s3Client = GcsS3Helper.getGcsS3Client(destinationConfig);
s3Client.putObject(destinationConfig.getBucketName(), "test", "check-content");
s3Client.deleteObject(destinationConfig.getBucketName(), "test");
return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED);
} catch (Exception e) {
LOGGER.error("Exception attempting to access the Gcs bucket: {}", e.getMessage());
return new AirbyteConnectionStatus()
.withStatus(AirbyteConnectionStatus.Status.FAILED)
.withMessage("Could not connect to the Gcs bucket with the provided configuration. \n" + e
.getMessage());
}
}

@Override
public AirbyteMessageConsumer getConsumer(JsonNode config,
ConfiguredAirbyteCatalog configuredCatalog,
Consumer<AirbyteMessage> outputRecordCollector) {
GcsWriterFactory formatterFactory = new ProductionWriterFactory();
return new GcsConsumer(GcsDestinationConfig.getGcsDestinationConfig(config), configuredCatalog, formatterFactory, outputRecordCollector);
}

}
Loading

0 comments on commit 9d7ab35

Please sign in to comment.