diff --git a/airbyte-integrations/connectors/destination-bigquery/build.gradle b/airbyte-integrations/connectors/destination-bigquery/build.gradle index d07349a16e45..dd4199cbc76d 100644 --- a/airbyte-integrations/connectors/destination-bigquery/build.gradle +++ b/airbyte-integrations/connectors/destination-bigquery/build.gradle @@ -28,6 +28,7 @@ dependencies { integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test') integrationTestJavaImplementation files(project(':airbyte-integrations:bases:base-normalization').airbyteDocker.outputs) integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-bigquery') + integrationTestJavaImplementation project(':airbyte-db:lib') implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs) } diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationAcceptanceTest.java index 9aeeff01c80e..3f6fd817b5c9 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationAcceptanceTest.java @@ -11,37 +11,37 @@ import com.google.auth.oauth2.ServiceAccountCredentials; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.BigQueryOptions; +import com.google.cloud.bigquery.ConnectionProperty; import com.google.cloud.bigquery.Dataset; import com.google.cloud.bigquery.DatasetInfo; -import com.google.cloud.bigquery.Field; import com.google.cloud.bigquery.FieldList; -import com.google.cloud.bigquery.FieldValue; -import com.google.cloud.bigquery.FieldValueList; import com.google.cloud.bigquery.Job; import com.google.cloud.bigquery.JobId; import com.google.cloud.bigquery.JobInfo; import com.google.cloud.bigquery.QueryJobConfiguration; import com.google.cloud.bigquery.TableResult; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Maps; +import com.google.common.collect.Streams; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.string.Strings; +import io.airbyte.db.bigquery.BigQueryResultSet; +import io.airbyte.db.bigquery.BigQuerySourceOperations; import io.airbyte.integrations.base.JavaBaseConstants; import io.airbyte.integrations.destination.NamingConventionTransformer; import io.airbyte.integrations.destination.StandardNameTransformer; import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest; +import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator; import java.io.ByteArrayInputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; -import java.util.ArrayList; +import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.Optional; +import java.util.TimeZone; import java.util.UUID; import java.util.stream.Collectors; -import java.util.stream.StreamSupport; import org.apache.commons.lang3.tuple.ImmutablePair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -100,6 +100,27 @@ protected boolean supportNamespaceTest() { return true; } + @Override + protected TestDataComparator getTestDataComparator() { + return new BigQueryTestDataComparator(); + } + + @Override + protected boolean supportBasicDataTypeTest() { + return true; + } + + @Override + protected boolean supportArrayDataTypeTest() { + // #13154 Normalization issue + return false; + } + + @Override + protected boolean supportObjectDataTypeTest() { + return true; + } + @Override protected Optional getNameTransformer() { return Optional.of(NAME_TRANSFORMER); @@ -107,8 +128,8 @@ protected Optional getNameTransformer() { @Override protected void assertNamespaceNormalization(final String testCaseId, - final String expectedNormalizedNamespace, - final String actualNormalizedNamespace) { + final String expectedNormalizedNamespace, + final String actualNormalizedNamespace) { final String message = String.format("Test case %s failed; if this is expected, please override assertNamespaceNormalization", testCaseId); if (testCaseId.equals("S3A-1")) { // bigquery allows namespace starting with a number, and prepending underscore @@ -134,9 +155,9 @@ protected List retrieveNormalizedRecords(final TestDestinationEnv test @Override protected List retrieveRecords(final TestDestinationEnv env, - final String streamName, - final String namespace, - final JsonNode streamSchema) + final String streamName, + final String namespace, + final JsonNode streamSchema) throws Exception { return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName), namingResolver.getIdentifier(namespace)) .stream() @@ -145,52 +166,24 @@ protected List retrieveRecords(final TestDestinationEnv env, .collect(Collectors.toList()); } - @Override - protected List resolveIdentifier(final String identifier) { - final List result = new ArrayList<>(); - result.add(identifier); - result.add(namingResolver.getIdentifier(identifier)); - return result; - } - private List retrieveRecordsFromTable(final String tableName, final String schema) throws InterruptedException { + TimeZone.setDefault(TimeZone.getTimeZone("UTC")); + final QueryJobConfiguration queryConfig = QueryJobConfiguration .newBuilder( String.format("SELECT * FROM `%s`.`%s` order by %s asc;", schema, tableName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT)) - .setUseLegacySql(false).build(); + .setUseLegacySql(false) + .setConnectionProperties(Collections.singletonList(ConnectionProperty.of("time_zone", "UTC"))) + .build(); final TableResult queryResults = executeQuery(bigquery, queryConfig).getLeft().getQueryResults(); final FieldList fields = queryResults.getSchema().getFields(); + BigQuerySourceOperations sourceOperations = new BigQuerySourceOperations(); - return StreamSupport - .stream(queryResults.iterateAll().spliterator(), false) - .map(row -> { - final Map jsonMap = Maps.newHashMap(); - for (final Field field : fields) { - final Object value = getTypedFieldValue(row, field); - jsonMap.put(field.getName(), value); - } - return jsonMap; - }) - .map(Jsons::jsonNode) - .collect(Collectors.toList()); - } - - private Object getTypedFieldValue(final FieldValueList row, final Field field) { - final FieldValue fieldValue = row.get(field.getName()); - if (fieldValue.getValue() != null) { - return switch (field.getType().getStandardType()) { - case FLOAT64, NUMERIC -> fieldValue.getDoubleValue(); - case INT64 -> fieldValue.getNumericValue().intValue(); - case STRING -> fieldValue.getStringValue(); - case BOOL -> fieldValue.getBooleanValue(); - default -> fieldValue.getValue(); - }; - } else { - return null; - } + return Streams.stream(queryResults.iterateAll()) + .map(fieldValues -> sourceOperations.rowToJson(new BigQueryResultSet(fieldValues, fields))).collect(Collectors.toList()); } @Override diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTest.java index dab6a5fabfd6..168b05c899b5 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTest.java @@ -111,7 +111,8 @@ class BigQueryDestinationTest { private static Stream datasetIdResetterProvider() { // parameterized test with two dataset-id patterns: `dataset_id` and `project-id:dataset_id` return Stream.of( - Arguments.arguments(new DatasetIdResetter(config -> {})), + Arguments.arguments(new DatasetIdResetter(config -> { + })), Arguments.arguments(new DatasetIdResetter( config -> { final String projectId = config.get(BigQueryConsts.CONFIG_PROJECT_ID).asText(); @@ -152,9 +153,9 @@ void setup(final TestInfo info) throws IOException { catalog = new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList( CatalogHelpers.createConfiguredAirbyteStream(USERS_STREAM_NAME, datasetId, - io.airbyte.protocol.models.Field.of("name", JsonSchemaType.STRING), - io.airbyte.protocol.models.Field - .of("id", JsonSchemaType.STRING)) + io.airbyte.protocol.models.Field.of("name", JsonSchemaType.STRING), + io.airbyte.protocol.models.Field + .of("id", JsonSchemaType.STRING)) .withDestinationSyncMode(DestinationSyncMode.APPEND), CatalogHelpers.createConfiguredAirbyteStream(TASKS_STREAM_NAME, datasetId, Field.of("goal", JsonSchemaType.STRING)))); diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryTestDataComparator.java b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryTestDataComparator.java new file mode 100644 index 000000000000..7d0b7be67d56 --- /dev/null +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryTestDataComparator.java @@ -0,0 +1,95 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.bigquery; + +import io.airbyte.integrations.destination.StandardNameTransformer; +import io.airbyte.integrations.standardtest.destination.comparator.AdvancedTestDataComparator; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BigQueryTestDataComparator extends AdvancedTestDataComparator { + + private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryTestDataComparator.class); + private final StandardNameTransformer namingResolver = new StandardNameTransformer(); + + private static final String BIGQUERY_DATETIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss'Z'"; + + @Override + protected List resolveIdentifier(final String identifier) { + final List result = new ArrayList<>(); + result.add(identifier); + result.add(namingResolver.getIdentifier(identifier)); + return result; + } + + private LocalDate parseDate(String dateValue) { + if (dateValue != null) { + var format = (dateValue.matches(".+Z") ? BIGQUERY_DATETIME_FORMAT : AIRBYTE_DATE_FORMAT); + return LocalDate.parse(dateValue, DateTimeFormatter.ofPattern(format)); + } else { + return null; + } + } + + private LocalDateTime parseDateTime(String dateTimeValue) { + if (dateTimeValue != null) { + var format = (dateTimeValue.matches(".+Z") ? BIGQUERY_DATETIME_FORMAT : AIRBYTE_DATETIME_FORMAT); + return LocalDateTime.parse(dateTimeValue, DateTimeFormatter.ofPattern(format)); + } else { + return null; + } + } + + @Override + protected boolean compareDateTimeValues(String expectedValue, String actualValue) { + var destinationDate = parseDateTime(actualValue); + var expectedDate = LocalDateTime.parse(expectedValue, DateTimeFormatter.ofPattern(AIRBYTE_DATETIME_FORMAT)); + // #13123 Normalization issue + if (expectedDate.isBefore(getBrokenDate().toLocalDateTime())) { + LOGGER + .warn("Validation is skipped due to known Normalization issue. Values older then 1583 year and with time zone stored wrongly(lose days)."); + return true; + } else { + return expectedDate.equals(destinationDate); + } + } + + @Override + protected boolean compareDateValues(String expectedValue, String actualValue) { + var destinationDate = parseDate(actualValue); + var expectedDate = LocalDate.parse(expectedValue, DateTimeFormatter.ofPattern(AIRBYTE_DATE_FORMAT)); + return expectedDate.equals(destinationDate); + } + + @Override + protected ZonedDateTime parseDestinationDateWithTz(String destinationValue) { + return ZonedDateTime.of(LocalDateTime.parse(destinationValue, DateTimeFormatter.ofPattern(BIGQUERY_DATETIME_FORMAT)), ZoneOffset.UTC); + } + + @Override + protected boolean compareDateTimeWithTzValues(String airbyteMessageValue, String destinationValue) { + // #13123 Normalization issue + if (parseDestinationDateWithTz(destinationValue).isBefore(getBrokenDate())) { + LOGGER + .warn("Validation is skipped due to known Normalization issue. Values older then 1583 year and with time zone stored wrongly(lose days)."); + return true; + } else { + return super.compareDateTimeWithTzValues(airbyteMessageValue, destinationValue); + } + } + + // #13123 Normalization issue + private ZonedDateTime getBrokenDate() { + return ZonedDateTime.of(1583, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC); + } + +}