From 58c411bfd0638f09bc5c6dacf76a591404ba7a10 Mon Sep 17 00:00:00 2001 From: Yevhen Sukhomud Date: Mon, 19 Dec 2022 16:34:50 +0700 Subject: [PATCH 1/2] Update destination-gcs to handle v1 data protocol --- .../gcs/GcsAvroDestinationAcceptanceTest.java | 6 +++++ ...sAvroParquetDestinationAcceptanceTest.java | 8 ++++++- .../gcs/GcsAvroTestDataComparator.java | 22 +++++++++++++++++++ .../gcs/GcsCsvDestinationAcceptanceTest.java | 10 ++++++++- .../GcsCsvGzipDestinationAcceptanceTest.java | 6 +++++ .../gcs/GcsDestinationAcceptanceTest.java | 6 +++++ .../GcsJsonlDestinationAcceptanceTest.java | 6 +++++ ...GcsJsonlGzipDestinationAcceptanceTest.java | 6 +++++ .../GcsParquetDestinationAcceptanceTest.java | 6 +++++ 9 files changed, 74 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsAvroDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsAvroDestinationAcceptanceTest.java index b26796e9a0d9..c5d0c0375687 100644 --- a/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsAvroDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsAvroDestinationAcceptanceTest.java @@ -13,6 +13,7 @@ import io.airbyte.integrations.destination.s3.avro.AvroConstants; import io.airbyte.integrations.destination.s3.avro.JsonFieldNameUpdater; import io.airbyte.integrations.destination.s3.util.AvroRecordHelper; +import io.airbyte.integrations.standardtest.destination.ProtocolVersion; import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator; import java.util.HashMap; import java.util.LinkedList; @@ -96,4 +97,9 @@ protected Map> retrieveDataTypesFromPersistedFiles(final Strin return resultDataTypes; } + @Override + public ProtocolVersion getProtocolVersion() { + return ProtocolVersion.V1; + } + } diff --git a/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsAvroParquetDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsAvroParquetDestinationAcceptanceTest.java index b34e45590451..96917d38d61f 100644 --- a/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsAvroParquetDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsAvroParquetDestinationAcceptanceTest.java @@ -11,6 +11,7 @@ import io.airbyte.commons.resources.MoreResources; import io.airbyte.integrations.destination.s3.S3Format; import io.airbyte.integrations.destination.s3.avro.JsonSchemaType; +import io.airbyte.integrations.standardtest.destination.ProtocolVersion; import io.airbyte.integrations.standardtest.destination.argproviders.NumberDataTypeTestArgumentProvider; import io.airbyte.protocol.models.v0.AirbyteCatalog; import io.airbyte.protocol.models.v0.AirbyteMessage; @@ -39,6 +40,11 @@ public GcsAvroParquetDestinationAcceptanceTest(final S3Format s3Format) { super(s3Format); } + @Override + public ProtocolVersion getProtocolVersion() { + return ProtocolVersion.V1; + } + @ParameterizedTest @ArgumentsSource(NumberDataTypeTestArgumentProvider.class) public void testNumberDataType(final String catalogFileName, final String messagesFileName) throws Exception { @@ -85,7 +91,7 @@ private JsonNode getJsonNode(final AirbyteStream stream, final String name) { } private Set getExpectedSchemaType(final JsonNode fieldDefinition) { - final JsonNode typeProperty = fieldDefinition.get("type"); + final JsonNode typeProperty = fieldDefinition.get("type") == null ? fieldDefinition.get("$ref") : fieldDefinition.get("type"); final JsonNode airbyteTypeProperty = fieldDefinition.get("airbyte_type"); final String airbyteTypePropertyText = airbyteTypeProperty == null ? null : airbyteTypeProperty.asText(); return Arrays.stream(JsonSchemaType.values()) diff --git a/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsAvroTestDataComparator.java b/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsAvroTestDataComparator.java index 427c6440c34b..34224612ee9a 100644 --- a/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsAvroTestDataComparator.java +++ b/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsAvroTestDataComparator.java @@ -4,9 +4,12 @@ package io.airbyte.integrations.destination.gcs; +import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.integrations.standardtest.destination.comparator.AdvancedTestDataComparator; +import java.nio.charset.StandardCharsets; import java.time.*; import java.time.format.DateTimeFormatter; +import java.util.Base64; public class GcsAvroTestDataComparator extends AdvancedTestDataComparator { @@ -33,4 +36,23 @@ protected boolean compareDateTimeValues(String airbyteMessageValue, String desti return super.compareDateTimeValues(airbyteMessageValue, format.format(dateTime)); } + @Override + protected boolean compareTime(final String airbyteMessageValue, final String destinationValue) { + var destinationDate = LocalTime.ofInstant(getInstantFromEpoch(destinationValue), ZoneOffset.UTC); + var expectedDate = LocalTime.parse(airbyteMessageValue, DateTimeFormatter.ISO_TIME); + return expectedDate.equals(destinationDate); + } + + @Override + protected boolean compareString(final JsonNode expectedValue, final JsonNode actualValue) { + // to handle base64 encoded strings + return expectedValue.asText().equals(actualValue.asText()) + || decodeBase64(expectedValue.asText()).equals(actualValue.asText()); + } + + private String decodeBase64(String string) { + byte[] decoded = Base64.getDecoder().decode(string); + return new String(decoded, StandardCharsets.UTF_8); + } + } diff --git a/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsCsvDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsCsvDestinationAcceptanceTest.java index 1d63c0768ffd..b974ded6b9ce 100644 --- a/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsCsvDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsCsvDestinationAcceptanceTest.java @@ -12,6 +12,7 @@ import io.airbyte.integrations.base.JavaBaseConstants; import io.airbyte.integrations.destination.s3.S3Format; import io.airbyte.integrations.destination.s3.csv.S3CsvFormatConfig.Flattening; +import io.airbyte.integrations.standardtest.destination.ProtocolVersion; import java.io.IOException; import java.io.InputStreamReader; import java.io.Reader; @@ -33,6 +34,11 @@ public GcsCsvDestinationAcceptanceTest() { super(S3Format.CSV); } + @Override + public ProtocolVersion getProtocolVersion() { + return ProtocolVersion.V1; + } + @Override protected JsonNode getFormatConfig() { return Jsons.jsonNode(Map.of( @@ -50,7 +56,9 @@ private static Map getFieldTypes(final JsonNode streamSchema) { final Iterator> iterator = fieldDefinitions.fields(); while (iterator.hasNext()) { final Map.Entry entry = iterator.next(); - fieldTypes.put(entry.getKey(), entry.getValue().get("type").asText()); + JsonNode fieldValue = entry.getValue(); + JsonNode typeValue = fieldValue.get("type") == null ? fieldValue.get("$ref") : fieldValue.get("type"); + fieldTypes.put(entry.getKey(), typeValue.asText()); } return fieldTypes; } diff --git a/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsCsvGzipDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsCsvGzipDestinationAcceptanceTest.java index c21e7545f5ca..b6bdf3156553 100644 --- a/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsCsvGzipDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsCsvGzipDestinationAcceptanceTest.java @@ -8,6 +8,7 @@ import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.destination.s3.csv.S3CsvFormatConfig.Flattening; +import io.airbyte.integrations.standardtest.destination.ProtocolVersion; import java.io.IOException; import java.io.InputStreamReader; import java.io.Reader; @@ -17,6 +18,11 @@ public class GcsCsvGzipDestinationAcceptanceTest extends GcsCsvDestinationAcceptanceTest { + @Override + public ProtocolVersion getProtocolVersion() { + return ProtocolVersion.V1; + } + @Override protected JsonNode getFormatConfig() { // config without compression defaults to GZIP diff --git a/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsDestinationAcceptanceTest.java index d2319e6623df..bb873a3a288f 100644 --- a/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsDestinationAcceptanceTest.java @@ -23,6 +23,7 @@ import io.airbyte.integrations.destination.s3.S3FormatConfig; import io.airbyte.integrations.destination.s3.S3StorageOperations; import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest; +import io.airbyte.integrations.standardtest.destination.ProtocolVersion; import io.airbyte.integrations.standardtest.destination.comparator.AdvancedTestDataComparator; import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator; import io.airbyte.protocol.models.v0.AirbyteConnectionStatus; @@ -72,6 +73,11 @@ protected JsonNode getBaseConfigJson() { return Jsons.deserialize(IOs.readFile(Path.of(SECRET_FILE_PATH))); } + @Override + public ProtocolVersion getProtocolVersion() { + return ProtocolVersion.V1; + } + @Override protected String getImageName() { return "airbyte/destination-gcs:dev"; diff --git a/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsJsonlDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsJsonlDestinationAcceptanceTest.java index 6f2d1855455b..6b916b9a0f84 100644 --- a/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsJsonlDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsJsonlDestinationAcceptanceTest.java @@ -10,6 +10,7 @@ import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.base.JavaBaseConstants; import io.airbyte.integrations.destination.s3.S3Format; +import io.airbyte.integrations.standardtest.destination.ProtocolVersion; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; @@ -24,6 +25,11 @@ public GcsJsonlDestinationAcceptanceTest() { super(S3Format.JSONL); } + @Override + public ProtocolVersion getProtocolVersion() { + return ProtocolVersion.V1; + } + @Override protected JsonNode getFormatConfig() { return Jsons.jsonNode(Map.of( diff --git a/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsJsonlGzipDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsJsonlGzipDestinationAcceptanceTest.java index b7b81c69cff3..3e99f6b5d83b 100644 --- a/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsJsonlGzipDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsJsonlGzipDestinationAcceptanceTest.java @@ -7,6 +7,7 @@ import com.amazonaws.services.s3.model.S3Object; import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.standardtest.destination.ProtocolVersion; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; @@ -16,6 +17,11 @@ public class GcsJsonlGzipDestinationAcceptanceTest extends GcsJsonlDestinationAcceptanceTest { + @Override + public ProtocolVersion getProtocolVersion() { + return ProtocolVersion.V1; + } + @Override protected JsonNode getFormatConfig() { // config without compression defaults to GZIP diff --git a/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsParquetDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsParquetDestinationAcceptanceTest.java index 2a3f465b87c8..155a51838c1b 100644 --- a/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsParquetDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsParquetDestinationAcceptanceTest.java @@ -15,6 +15,7 @@ import io.airbyte.integrations.destination.s3.avro.JsonFieldNameUpdater; import io.airbyte.integrations.destination.s3.parquet.S3ParquetWriter; import io.airbyte.integrations.destination.s3.util.AvroRecordHelper; +import io.airbyte.integrations.standardtest.destination.ProtocolVersion; import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator; import java.io.IOException; import java.net.URI; @@ -37,6 +38,11 @@ public GcsParquetDestinationAcceptanceTest() { super(S3Format.PARQUET); } + @Override + public ProtocolVersion getProtocolVersion() { + return ProtocolVersion.V1; + } + @Override protected JsonNode getFormatConfig() { return Jsons.jsonNode(Map.of( From d008dda8aaacdf10b8345952d908b535a8684af5 Mon Sep 17 00:00:00 2001 From: Yevhen Sukhomud Date: Tue, 20 Dec 2022 16:23:35 +0700 Subject: [PATCH 2/2] Review changes --- .../gcs/GcsAvroParquetDestinationAcceptanceTest.java | 1 + .../destination/gcs/GcsAvroTestDataComparator.java | 10 +++++----- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsAvroParquetDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsAvroParquetDestinationAcceptanceTest.java index 96917d38d61f..4b53d1da0523 100644 --- a/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsAvroParquetDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsAvroParquetDestinationAcceptanceTest.java @@ -91,6 +91,7 @@ private JsonNode getJsonNode(final AirbyteStream stream, final String name) { } private Set getExpectedSchemaType(final JsonNode fieldDefinition) { + // The $ref is a migration to V1 data type protocol see well_known_types.yaml final JsonNode typeProperty = fieldDefinition.get("type") == null ? fieldDefinition.get("$ref") : fieldDefinition.get("type"); final JsonNode airbyteTypeProperty = fieldDefinition.get("airbyte_type"); final String airbyteTypePropertyText = airbyteTypeProperty == null ? null : airbyteTypeProperty.asText(); diff --git a/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsAvroTestDataComparator.java b/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsAvroTestDataComparator.java index 34224612ee9a..edba4c27c6c8 100644 --- a/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsAvroTestDataComparator.java +++ b/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsAvroTestDataComparator.java @@ -15,8 +15,8 @@ public class GcsAvroTestDataComparator extends AdvancedTestDataComparator { @Override protected boolean compareDateValues(String expectedValue, String actualValue) { - var destinationDate = LocalDate.ofEpochDay(Long.parseLong(actualValue)); - var expectedDate = LocalDate.parse(expectedValue, DateTimeFormatter.ofPattern(AIRBYTE_DATE_FORMAT)); + LocalDate destinationDate = LocalDate.ofEpochDay(Long.parseLong(actualValue)); + LocalDate expectedDate = LocalDate.parse(expectedValue, DateTimeFormatter.ofPattern(AIRBYTE_DATE_FORMAT)); return expectedDate.equals(destinationDate); } @@ -31,15 +31,15 @@ protected ZonedDateTime parseDestinationDateWithTz(String destinationValue) { @Override protected boolean compareDateTimeValues(String airbyteMessageValue, String destinationValue) { - var format = DateTimeFormatter.ofPattern(AIRBYTE_DATETIME_FORMAT); + DateTimeFormatter format = DateTimeFormatter.ofPattern(AIRBYTE_DATETIME_FORMAT); LocalDateTime dateTime = LocalDateTime.ofInstant(getInstantFromEpoch(destinationValue), ZoneOffset.UTC); return super.compareDateTimeValues(airbyteMessageValue, format.format(dateTime)); } @Override protected boolean compareTime(final String airbyteMessageValue, final String destinationValue) { - var destinationDate = LocalTime.ofInstant(getInstantFromEpoch(destinationValue), ZoneOffset.UTC); - var expectedDate = LocalTime.parse(airbyteMessageValue, DateTimeFormatter.ISO_TIME); + LocalTime destinationDate = LocalTime.ofInstant(getInstantFromEpoch(destinationValue), ZoneOffset.UTC); + LocalTime expectedDate = LocalTime.parse(airbyteMessageValue, DateTimeFormatter.ISO_TIME); return expectedDate.equals(destinationDate); }