Skip to content

Commit

Permalink
🎉 BigQuery Denormalized Destination: Support for more bigquery types …
Browse files Browse the repository at this point in the history
…through the format annotation (#6145)

* destination-bigquery-denormalized: introduce json spec format key handling

* destination-bigquery-denormalized: Bump version
  • Loading branch information
cptjacky authored Sep 29, 2021
1 parent 805fabb commit 0105ca9
Show file tree
Hide file tree
Showing 8 changed files with 167 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "079d5540-f236-4294-ba7c-ade8fd918496",
"name": "BigQuery (denormalized typed struct)",
"dockerRepository": "airbyte/destination-bigquery-denormalized",
"dockerImageTag": "0.1.5",
"dockerImageTag": "0.1.6",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/bigquery"
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
- destinationDefinitionId: 079d5540-f236-4294-ba7c-ade8fd918496
name: BigQuery (denormalized typed struct)
dockerRepository: airbyte/destination-bigquery-denormalized
dockerImageTag: 0.1.5
dockerImageTag: 0.1.6
documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery
- destinationDefinitionId: ca8f6566-e555-4b40-943a-545bf123117a
name: Google Cloud Storage (GCS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.5
LABEL io.airbyte.version=0.1.6
LABEL io.airbyte.name=airbyte/destination-bigquery-denormalized
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class BigQueryDenormalizedDestination extends BigQueryDestination {
protected static final String PROPERTIES_FIELD = "properties";
protected static final String NESTED_ARRAY_FIELD = "value";
private static final String TYPE_FIELD = "type";
private static final String FORMAT_FIELD = "format";

@Override
protected String getTargetTableName(String streamName) {
Expand Down Expand Up @@ -134,6 +135,16 @@ private static Builder getField(BigQuerySQLNameTransformer namingResolver, Strin
}
}
}

// If a specific format is defined, use their specific type instead of the JSON's one
final JsonNode fieldFormat = fieldDefinition.get(FORMAT_FIELD);
if (fieldFormat != null) {
final JsonSchemaFormat schemaFormat = JsonSchemaFormat.fromJsonSchemaFormat(fieldFormat.asText());
if (schemaFormat != null) {
builder.setType(schemaFormat.getBigQueryType());
}
}

return builder;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.integrations.destination.bigquery;

import com.google.cloud.bigquery.StandardSQLTypeName;

/**
* Mapping of JsonSchema formats to BigQuery Standard SQL types.
*/
public enum JsonSchemaFormat {

DATE("date", StandardSQLTypeName.DATE),
DATETIME("date-time", StandardSQLTypeName.DATETIME),
TIME("time", StandardSQLTypeName.TIME);

private final String jsonSchemaFormat;
private final StandardSQLTypeName bigQueryType;

JsonSchemaFormat(String jsonSchemaFormat, StandardSQLTypeName bigQueryType) {
this.jsonSchemaFormat = jsonSchemaFormat;
this.bigQueryType = bigQueryType;
}

public static JsonSchemaFormat fromJsonSchemaFormat(String value) {
for (JsonSchemaFormat type : values()) {
if (value.equals(type.jsonSchemaFormat)) {
return type;
}
}
return null;
}


public String getJsonSchemaFormat() {
return jsonSchemaFormat;
}

public StandardSQLTypeName getBigQueryType() {
return bigQueryType;
}

@Override
public String toString() {
return jsonSchemaFormat;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,17 @@
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.Dataset;
import com.google.cloud.bigquery.DatasetInfo;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.StandardSQLTypeName;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.string.Strings;
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStream;
Expand All @@ -40,6 +44,7 @@
import java.util.stream.StreamSupport;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
Expand All @@ -64,6 +69,10 @@ class BigQueryDenormalizedDestinationTest {
.withRecord(new AirbyteRecordMessage().withStream(USERS_STREAM_NAME)
.withData(getDataWithEmptyObjectAndArray())
.withEmittedAt(NOW.toEpochMilli()));
private static final AirbyteMessage MESSAGE_USERS3 = new AirbyteMessage().withType(AirbyteMessage.Type.RECORD)
.withRecord(new AirbyteRecordMessage().withStream(USERS_STREAM_NAME)
.withData(getDataWithFormats())
.withEmittedAt(NOW.toEpochMilli()));

private JsonNode config;

Expand Down Expand Up @@ -99,6 +108,7 @@ void setup(TestInfo info) throws IOException {
final String datasetLocation = "EU";
MESSAGE_USERS1.getRecord().setNamespace(datasetId);
MESSAGE_USERS2.getRecord().setNamespace(datasetId);
MESSAGE_USERS3.getRecord().setNamespace(datasetId);

final DatasetInfo datasetInfo = DatasetInfo.newBuilder(datasetId).setLocation(datasetLocation).build();
dataset = bigquery.create(datasetInfo);
Expand Down Expand Up @@ -166,7 +176,39 @@ void testNestedWrite(JsonNode schema, AirbyteMessage message) throws Exception {
assertEquals(extractJsonValues(resultJson, "name"), extractJsonValues(expectedUsersJson, "name"));
assertEquals(extractJsonValues(resultJson, "grants"), extractJsonValues(expectedUsersJson, "grants"));
assertEquals(extractJsonValues(resultJson, "domain"), extractJsonValues(expectedUsersJson, "domain"));
}

@Test
void testWriteWithFormat() throws Exception {
catalog = new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(new ConfiguredAirbyteStream()
.withStream(new AirbyteStream().withName(USERS_STREAM_NAME).withNamespace(datasetId).withJsonSchema(getSchemaWithFormats()))
.withSyncMode(SyncMode.FULL_REFRESH).withDestinationSyncMode(DestinationSyncMode.OVERWRITE)));

final BigQueryDestination destination = new BigQueryDenormalizedDestination();
final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, Destination::defaultOutputRecordCollector);

consumer.accept(MESSAGE_USERS3);
consumer.close();

final List<JsonNode> usersActual = retrieveRecordsAsJson(USERS_STREAM_NAME);
final JsonNode expectedUsersJson = MESSAGE_USERS3.getRecord().getData();
assertEquals(usersActual.size(), 1);
final JsonNode resultJson = usersActual.get(0);
assertEquals(extractJsonValues(resultJson, "name"), extractJsonValues(expectedUsersJson, "name"));
assertEquals(extractJsonValues(resultJson, "date_of_birth"), extractJsonValues(expectedUsersJson, "date_of_birth"));

// Bigquery's datetime type accepts multiple input format but always outputs the same, so we can't expect to receive the value we sent.
assertEquals(extractJsonValues(resultJson, "updated_at"), Set.of("2018-08-19T12:11:35.220"));

final Schema expectedSchema = Schema.of(
Field.of("name", StandardSQLTypeName.STRING),
Field.of("date_of_birth", StandardSQLTypeName.DATE),
Field.of("updated_at", StandardSQLTypeName.DATETIME),
Field.of(JavaBaseConstants.COLUMN_NAME_AB_ID, StandardSQLTypeName.STRING),
Field.of(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, StandardSQLTypeName.TIMESTAMP)
);

assertEquals(BigQueryUtils.getTableDefinition(bigquery, dataset.getDatasetId().getDataset(), USERS_STREAM_NAME).getSchema(), expectedSchema);
}

private Set<String> extractJsonValues(JsonNode node, String attributeName) {
Expand Down Expand Up @@ -252,6 +294,34 @@ private static JsonNode getSchema() {

}

private static JsonNode getSchemaWithFormats() {
return Jsons.deserialize(
"{\n"
+ " \"type\": [\n"
+ " \"object\"\n"
+ " ],\n"
+ " \"properties\": {\n"
+ " \"name\": {\n"
+ " \"type\": [\n"
+ " \"string\"\n"
+ " ]\n"
+ " },\n"
+ " \"date_of_birth\": {\n"
+ " \"type\": [\n"
+ " \"string\"\n"
+ " ],\n"
+ " \"format\": \"date\"\n"
+ " },\n"
+ " \"updated_at\": {\n"
+ " \"type\": [\n"
+ " \"string\"\n"
+ " ],\n"
+ " \"format\": \"date-time\"\n"
+ " }\n"
+ " }\n"
+ "}");
}

private static JsonNode getSchemaWithInvalidArrayType() {
return Jsons.deserialize(
"{\n"
Expand Down Expand Up @@ -310,7 +380,15 @@ private static JsonNode getData() {
+ " }\n"
+ " ]\n"
+ "}");
}

private static JsonNode getDataWithFormats() {
return Jsons.deserialize(
"{\n"
+ " \"name\": \"Andrii\",\n"
+ " \"date_of_birth\": \"1996-01-25\",\n"
+ " \"updated_at\": \"2018-08-19 12:11:35.22\"\n"
+ "}");
}

private static JsonNode getDataWithEmptyObjectAndArray() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,4 +120,9 @@ public static String getDatasetLocation(JsonNode config) {
}
}

static TableDefinition getTableDefinition(BigQuery bigquery, String datasetName, String tableName) {
final TableId tableId = TableId.of(datasetName, tableName);
return bigquery.getTable(tableId).getDefinition();
}

}
1 change: 1 addition & 0 deletions docs/integrations/destinations/bigquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ Therefore, Airbyte BigQuery destination will convert any invalid characters into

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.1.6 | 2021-09-16 | [#6145](https://github.com/airbytehq/airbyte/pull/6145) | BigQuery Denormalized support for date, datetime & timestamp types through the json "format" key
| 0.1.5 | 2021-09-07 | [#5881](https://github.com/airbytehq/airbyte/pull/5881) | BigQuery Denormalized NPE fix
| 0.1.4 | 2021-09-04 | [#5813](https://github.com/airbytehq/airbyte/pull/5813) | fix Stackoverflow error when receive a schema from source where "Array" type doesn't contain a required "items" element |
| 0.1.3 | 2021-08-07 | [#5261](https://github.com/airbytehq/airbyte/pull/5261) | 🐛 Destination BigQuery(Denormalized): Fix processing arrays of records |
Expand Down

0 comments on commit 0105ca9

Please sign in to comment.