Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 Destination-gcs handling v1 data protocol #20635

Merged
merged 3 commits into from
Jan 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.airbyte.integrations.destination.s3.avro.AvroConstants;
import io.airbyte.integrations.destination.s3.avro.JsonFieldNameUpdater;
import io.airbyte.integrations.destination.s3.util.AvroRecordHelper;
import io.airbyte.integrations.standardtest.destination.ProtocolVersion;
import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator;
import java.util.HashMap;
import java.util.LinkedList;
Expand Down Expand Up @@ -96,4 +97,9 @@ protected Map<String, Set<Type>> retrieveDataTypesFromPersistedFiles(final Strin
return resultDataTypes;
}

@Override
public ProtocolVersion getProtocolVersion() {
return ProtocolVersion.V1;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.integrations.destination.s3.S3Format;
import io.airbyte.integrations.destination.s3.avro.JsonSchemaType;
import io.airbyte.integrations.standardtest.destination.ProtocolVersion;
import io.airbyte.integrations.standardtest.destination.argproviders.NumberDataTypeTestArgumentProvider;
import io.airbyte.protocol.models.v0.AirbyteCatalog;
import io.airbyte.protocol.models.v0.AirbyteMessage;
Expand Down Expand Up @@ -39,6 +40,11 @@ public GcsAvroParquetDestinationAcceptanceTest(final S3Format s3Format) {
super(s3Format);
}

@Override
public ProtocolVersion getProtocolVersion() {
return ProtocolVersion.V1;
}

@ParameterizedTest
@ArgumentsSource(NumberDataTypeTestArgumentProvider.class)
public void testNumberDataType(final String catalogFileName, final String messagesFileName) throws Exception {
Expand Down Expand Up @@ -85,7 +91,8 @@ private JsonNode getJsonNode(final AirbyteStream stream, final String name) {
}

private Set<Type> getExpectedSchemaType(final JsonNode fieldDefinition) {
final JsonNode typeProperty = fieldDefinition.get("type");
// The $ref is a migration to V1 data type protocol see well_known_types.yaml
final JsonNode typeProperty = fieldDefinition.get("type") == null ? fieldDefinition.get("$ref") : fieldDefinition.get("type");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work, suggestion to maybe considering a comment here that notes that $ref is utilized as a result of the version migration and that type is the original standard since tests are a great way to maintain knowledge of business context

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The type is still valid since we have object and arrays (for example: "type": ["object"]) in schemas but it will be worth to mentioned the $ref in the comments since it's not obvious. Thanks for suggesting!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome, both changes look great, and thanks for making this code re-useable

final JsonNode airbyteTypeProperty = fieldDefinition.get("airbyte_type");
final String airbyteTypePropertyText = airbyteTypeProperty == null ? null : airbyteTypeProperty.asText();
return Arrays.stream(JsonSchemaType.values())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,19 @@

package io.airbyte.integrations.destination.gcs;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.integrations.standardtest.destination.comparator.AdvancedTestDataComparator;
import java.nio.charset.StandardCharsets;
import java.time.*;
import java.time.format.DateTimeFormatter;
import java.util.Base64;

public class GcsAvroTestDataComparator extends AdvancedTestDataComparator {

@Override
protected boolean compareDateValues(String expectedValue, String actualValue) {
var destinationDate = LocalDate.ofEpochDay(Long.parseLong(actualValue));
var expectedDate = LocalDate.parse(expectedValue, DateTimeFormatter.ofPattern(AIRBYTE_DATE_FORMAT));
LocalDate destinationDate = LocalDate.ofEpochDay(Long.parseLong(actualValue));
LocalDate expectedDate = LocalDate.parse(expectedValue, DateTimeFormatter.ofPattern(AIRBYTE_DATE_FORMAT));
return expectedDate.equals(destinationDate);
}

Expand All @@ -28,9 +31,28 @@ protected ZonedDateTime parseDestinationDateWithTz(String destinationValue) {

@Override
protected boolean compareDateTimeValues(String airbyteMessageValue, String destinationValue) {
var format = DateTimeFormatter.ofPattern(AIRBYTE_DATETIME_FORMAT);
DateTimeFormatter format = DateTimeFormatter.ofPattern(AIRBYTE_DATETIME_FORMAT);
LocalDateTime dateTime = LocalDateTime.ofInstant(getInstantFromEpoch(destinationValue), ZoneOffset.UTC);
return super.compareDateTimeValues(airbyteMessageValue, format.format(dateTime));
}

@Override
protected boolean compareTime(final String airbyteMessageValue, final String destinationValue) {
LocalTime destinationDate = LocalTime.ofInstant(getInstantFromEpoch(destinationValue), ZoneOffset.UTC);
LocalTime expectedDate = LocalTime.parse(airbyteMessageValue, DateTimeFormatter.ISO_TIME);
return expectedDate.equals(destinationDate);
}

@Override
protected boolean compareString(final JsonNode expectedValue, final JsonNode actualValue) {
// to handle base64 encoded strings
return expectedValue.asText().equals(actualValue.asText())
|| decodeBase64(expectedValue.asText()).equals(actualValue.asText());
}

private String decodeBase64(String string) {
byte[] decoded = Base64.getDecoder().decode(string);
return new String(decoded, StandardCharsets.UTF_8);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.s3.S3Format;
import io.airbyte.integrations.destination.s3.csv.S3CsvFormatConfig.Flattening;
import io.airbyte.integrations.standardtest.destination.ProtocolVersion;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
Expand All @@ -33,6 +34,11 @@ public GcsCsvDestinationAcceptanceTest() {
super(S3Format.CSV);
}

@Override
public ProtocolVersion getProtocolVersion() {
return ProtocolVersion.V1;
}

@Override
protected JsonNode getFormatConfig() {
return Jsons.jsonNode(Map.of(
Expand All @@ -50,7 +56,9 @@ private static Map<String, String> getFieldTypes(final JsonNode streamSchema) {
final Iterator<Entry<String, JsonNode>> iterator = fieldDefinitions.fields();
while (iterator.hasNext()) {
final Map.Entry<String, JsonNode> entry = iterator.next();
fieldTypes.put(entry.getKey(), entry.getValue().get("type").asText());
JsonNode fieldValue = entry.getValue();
JsonNode typeValue = fieldValue.get("type") == null ? fieldValue.get("$ref") : fieldValue.get("type");
fieldTypes.put(entry.getKey(), typeValue.asText());
}
return fieldTypes;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.destination.s3.csv.S3CsvFormatConfig.Flattening;
import io.airbyte.integrations.standardtest.destination.ProtocolVersion;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
Expand All @@ -17,6 +18,11 @@

public class GcsCsvGzipDestinationAcceptanceTest extends GcsCsvDestinationAcceptanceTest {

@Override
public ProtocolVersion getProtocolVersion() {
return ProtocolVersion.V1;
}

@Override
protected JsonNode getFormatConfig() {
// config without compression defaults to GZIP
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.airbyte.integrations.destination.s3.S3FormatConfig;
import io.airbyte.integrations.destination.s3.S3StorageOperations;
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
import io.airbyte.integrations.standardtest.destination.ProtocolVersion;
import io.airbyte.integrations.standardtest.destination.comparator.AdvancedTestDataComparator;
import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus;
Expand Down Expand Up @@ -72,6 +73,11 @@ protected JsonNode getBaseConfigJson() {
return Jsons.deserialize(IOs.readFile(Path.of(SECRET_FILE_PATH)));
}

@Override
public ProtocolVersion getProtocolVersion() {
return ProtocolVersion.V1;
}

@Override
protected String getImageName() {
return "airbyte/destination-gcs:dev";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.s3.S3Format;
import io.airbyte.integrations.standardtest.destination.ProtocolVersion;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
Expand All @@ -24,6 +25,11 @@ public GcsJsonlDestinationAcceptanceTest() {
super(S3Format.JSONL);
}

@Override
public ProtocolVersion getProtocolVersion() {
return ProtocolVersion.V1;
}

@Override
protected JsonNode getFormatConfig() {
return Jsons.jsonNode(Map.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.amazonaws.services.s3.model.S3Object;
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.standardtest.destination.ProtocolVersion;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
Expand All @@ -16,6 +17,11 @@

public class GcsJsonlGzipDestinationAcceptanceTest extends GcsJsonlDestinationAcceptanceTest {

@Override
public ProtocolVersion getProtocolVersion() {
return ProtocolVersion.V1;
}

@Override
protected JsonNode getFormatConfig() {
// config without compression defaults to GZIP
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.airbyte.integrations.destination.s3.avro.JsonFieldNameUpdater;
import io.airbyte.integrations.destination.s3.parquet.S3ParquetWriter;
import io.airbyte.integrations.destination.s3.util.AvroRecordHelper;
import io.airbyte.integrations.standardtest.destination.ProtocolVersion;
import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator;
import java.io.IOException;
import java.net.URI;
Expand All @@ -37,6 +38,11 @@ public GcsParquetDestinationAcceptanceTest() {
super(S3Format.PARQUET);
}

@Override
public ProtocolVersion getProtocolVersion() {
return ProtocolVersion.V1;
}

@Override
protected JsonNode getFormatConfig() {
return Jsons.jsonNode(Map.of(
Expand Down