Skip to content

Commit

Permalink
Destination bigquery: upgrade cdk (#35315)
Browse files Browse the repository at this point in the history
Signed-off-by: Gireesh Sreepathi <[email protected]>
Co-authored-by: Gireesh Sreepathi <[email protected]>
  • Loading branch information
edgao and gisripa authored Mar 4, 2024
1 parent 24c8b44 commit 160ec72
Show file tree
Hide file tree
Showing 11 changed files with 350 additions and 281 deletions.
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
plugins {
id 'airbyte-java-connector'
id 'org.jetbrains.kotlin.jvm' version '1.9.22'
}

airbyteJavaConnector {
cdkVersionRequired = '0.20.9'
cdkVersionRequired = '0.23.11'
features = [
'db-destinations',
'datastore-bigquery',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
dockerImageTag: 2.4.11
dockerImageTag: 2.4.12
dockerRepository: airbyte/destination-bigquery
documentationUrl: https://docs.airbyte.com/integrations/destinations/bigquery
githubIssueLabel: destination-bigquery
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand Down Expand Up @@ -233,9 +234,11 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN
final boolean disableTypeDedupe = BigQueryUtils.getDisableTypeDedupFlag(config);
final String datasetLocation = BigQueryUtils.getDatasetLocation(config);
final BigQuerySqlGenerator sqlGenerator = new BigQuerySqlGenerator(config.get(BigQueryConsts.CONFIG_PROJECT_ID).asText(), datasetLocation);
final ParsedCatalog parsedCatalog = parseCatalog(config, catalog, datasetLocation);
final Optional<String> rawNamespaceOverride = TypingAndDedupingFlag.getRawNamespaceOverride(RAW_DATA_DATASET);
final ParsedCatalog parsedCatalog = parseCatalog(config, catalog, datasetLocation, rawNamespaceOverride);
final BigQuery bigquery = getBigQuery(config);
final TyperDeduper typerDeduper = buildTyperDeduper(sqlGenerator, parsedCatalog, bigquery, datasetLocation, disableTypeDedupe);
final TyperDeduper typerDeduper =
buildTyperDeduper(sqlGenerator, parsedCatalog, bigquery, datasetLocation, disableTypeDedupe);

AirbyteExceptionHandler.addAllStringsInConfigForDeinterpolation(config);
final JsonNode serviceAccountKey = config.get(BigQueryConsts.CONFIG_CREDS);
Expand Down Expand Up @@ -360,7 +363,6 @@ private SerializedAirbyteMessageConsumer getStandardRecordConsumer(final BigQuer
final Consumer<AirbyteMessage> outputRecordCollector,
final TyperDeduper typerDeduper)
throws Exception {
typerDeduper.prepareTables();
final Supplier<ConcurrentMap<AirbyteStreamNameNamespacePair, AbstractBigQueryUploader<?>>> writeConfigs = getUploaderMap(
bigquery,
config,
Expand All @@ -372,6 +374,8 @@ private SerializedAirbyteMessageConsumer getStandardRecordConsumer(final BigQuer
return new BigQueryRecordStandardConsumer(
outputRecordCollector,
() -> {
typerDeduper.prepareSchemasAndRunMigrations();

// Set up our raw tables
writeConfigs.get().forEach((streamId, uploader) -> {
final StreamConfig stream = parsedCatalog.getStream(streamId);
Expand All @@ -390,6 +394,8 @@ private SerializedAirbyteMessageConsumer getStandardRecordConsumer(final BigQuer
uploader.createRawTable();
}
});

typerDeduper.prepareFinalTables();
},
(hasFailed, streamSyncSummaries) -> {
try {
Expand Down Expand Up @@ -424,11 +430,13 @@ private void setDefaultStreamNamespace(final ConfiguredAirbyteCatalog catalog, f
}
}

private ParsedCatalog parseCatalog(final JsonNode config, final ConfiguredAirbyteCatalog catalog, final String datasetLocation) {
private ParsedCatalog parseCatalog(final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final String datasetLocation,
final Optional<String> rawNamespaceOverride) {
final BigQuerySqlGenerator sqlGenerator = new BigQuerySqlGenerator(config.get(BigQueryConsts.CONFIG_PROJECT_ID).asText(), datasetLocation);
final CatalogParser catalogParser = TypingAndDedupingFlag.getRawNamespaceOverride(RAW_DATA_DATASET).isPresent()
? new CatalogParser(sqlGenerator, TypingAndDedupingFlag.getRawNamespaceOverride(RAW_DATA_DATASET).get())
: new CatalogParser(sqlGenerator);
final CatalogParser catalogParser = rawNamespaceOverride.map(s -> new CatalogParser(sqlGenerator, s))
.orElseGet(() -> new CatalogParser(sqlGenerator));

return catalogParser.parseCatalog(catalog);
}
Expand All @@ -440,11 +448,13 @@ private TyperDeduper buildTyperDeduper(final BigQuerySqlGenerator sqlGenerator,
final boolean disableTypeDedupe) {
final BigQueryV1V2Migrator migrator = new BigQueryV1V2Migrator(bigquery, namingResolver);
final BigQueryV2TableMigrator v2RawTableMigrator = new BigQueryV2TableMigrator(bigquery);
final BigQueryDestinationHandler destinationHandler = new BigQueryDestinationHandler(bigquery, datasetLocation);
final BigQueryDestinationHandler destinationHandler = new BigQueryDestinationHandler(
bigquery,
datasetLocation);

if (disableTypeDedupe) {
return new NoOpTyperDeduperWithV1V2Migrations<>(
sqlGenerator, destinationHandler, parsedCatalog, migrator, v2RawTableMigrator, 8);
sqlGenerator, destinationHandler, parsedCatalog, migrator, v2RawTableMigrator, List.of());
}

return new DefaultTyperDeduper<>(
Expand All @@ -453,8 +463,7 @@ private TyperDeduper buildTyperDeduper(final BigQuerySqlGenerator sqlGenerator,
parsedCatalog,
migrator,
v2RawTableMigrator,
8);

List.of());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ private OnStartFunction onStartFunction(final BigQueryStagingOperations bigQuery
final TyperDeduper typerDeduper) {
return () -> {
LOGGER.info("Preparing airbyte_raw tables in destination started for {} streams", writeConfigs.size());
typerDeduper.prepareTables();
typerDeduper.prepareSchemasAndRunMigrations();

for (final BigQueryWriteConfig writeConfig : writeConfigs.values()) {
LOGGER.info("Preparing staging are in destination for schema: {}, stream: {}, target table: {}, stage: {}",
writeConfig.tableSchema(), writeConfig.streamName(), writeConfig.targetTableId(), writeConfig.streamName());
Expand All @@ -156,6 +157,8 @@ private OnStartFunction onStartFunction(final BigQueryStagingOperations bigQuery
bigQueryGcsOperations.truncateTableIfExists(rawDatasetId, writeConfig.targetTableId(), writeConfig.tableSchema());
}
}

typerDeduper.prepareFinalTables();
LOGGER.info("Preparing tables in destination completed.");
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,17 @@

package io.airbyte.integrations.destination.bigquery.typing_deduping;

import static io.airbyte.integrations.base.destination.typing_deduping.CollectionUtils.containsAllIgnoreCase;
import static io.airbyte.integrations.base.destination.typing_deduping.CollectionUtils.containsIgnoreCase;
import static io.airbyte.integrations.base.destination.typing_deduping.CollectionUtils.matchingKey;
import static io.airbyte.integrations.destination.bigquery.typing_deduping.BigQuerySqlGenerator.QUOTE;
import static io.airbyte.integrations.destination.bigquery.typing_deduping.BigQuerySqlGenerator.clusteringColumns;
import static io.airbyte.integrations.destination.bigquery.typing_deduping.BigQuerySqlGenerator.toDialectType;
import static java.util.stream.Collectors.toMap;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.FieldValue;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobConfiguration;
Expand All @@ -14,28 +23,46 @@
import com.google.cloud.bigquery.JobStatistics;
import com.google.cloud.bigquery.JobStatus;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.StandardSQLTypeName;
import com.google.cloud.bigquery.StandardTableDefinition;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TableDefinition;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TimePartitioning;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Streams;
import io.airbyte.cdk.integrations.base.AirbyteExceptionHandler;
import io.airbyte.cdk.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.base.destination.typing_deduping.AlterTableReport;
import io.airbyte.integrations.base.destination.typing_deduping.ColumnId;
import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler;
import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus;
import io.airbyte.integrations.base.destination.typing_deduping.InitialRawTableStatus;
import io.airbyte.integrations.base.destination.typing_deduping.Sql;
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig;
import io.airbyte.integrations.base.destination.typing_deduping.StreamId;
import io.airbyte.integrations.base.destination.typing_deduping.TableNotMigratedException;
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState;
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState.Impl;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.text.StringSubstitutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

// TODO this stuff almost definitely exists somewhere else in our codebase.
public class BigQueryDestinationHandler implements DestinationHandler<TableDefinition> {
public class BigQueryDestinationHandler implements DestinationHandler<MinimumDestinationState.Impl> {

private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryDestinationHandler.class);

Expand All @@ -47,32 +74,24 @@ public BigQueryDestinationHandler(final BigQuery bq, final String datasetLocatio
this.datasetLocation = datasetLocation;
}

@Override
public Optional<TableDefinition> findExistingTable(final StreamId id) {
final Table table = bq.getTable(id.finalNamespace(), id.finalName());
return Optional.ofNullable(table).map(Table::getDefinition);
}

@Override
public LinkedHashMap<String, TableDefinition> findExistingFinalTables(List<StreamId> streamIds) throws Exception {
return null;
}

@Override
public boolean isFinalTableEmpty(final StreamId id) {
return BigInteger.ZERO.equals(bq.getTable(TableId.of(id.finalNamespace(), id.finalName())).getNumRows());
}

@Override
public InitialRawTableState getInitialRawTableState(final StreamId id) throws Exception {
public InitialRawTableStatus getInitialRawTableState(final StreamId id) throws Exception {
final Table rawTable = bq.getTable(TableId.of(id.rawNamespace(), id.rawName()));
if (rawTable == null) {
// Table doesn't exist. There are no unprocessed records, and no timestamp.
return new InitialRawTableState(false, Optional.empty());
return new InitialRawTableStatus(false, false, Optional.empty());
}

final FieldValue unloadedRecordTimestamp = bq.query(QueryJobConfiguration.newBuilder(new StringSubstitutor(Map.of(
"raw_table", id.rawTableId(BigQuerySqlGenerator.QUOTE))).replace(
"raw_table", id.rawTableId(QUOTE))).replace(
// bigquery timestamps have microsecond precision
"""
SELECT TIMESTAMP_SUB(MIN(_airbyte_extracted_at), INTERVAL 1 MICROSECOND)
Expand All @@ -84,11 +103,11 @@ SELECT TIMESTAMP_SUB(MIN(_airbyte_extracted_at), INTERVAL 1 MICROSECOND)
// If it's not null, then we can return immediately - we've found some unprocessed records and their
// timestamp.
if (!unloadedRecordTimestamp.isNull()) {
return new InitialRawTableState(true, Optional.of(unloadedRecordTimestamp.getTimestampInstant()));
return new InitialRawTableStatus(true, true, Optional.of(unloadedRecordTimestamp.getTimestampInstant()));
}

final FieldValue loadedRecordTimestamp = bq.query(QueryJobConfiguration.newBuilder(new StringSubstitutor(Map.of(
"raw_table", id.rawTableId(BigQuerySqlGenerator.QUOTE))).replace(
"raw_table", id.rawTableId(QUOTE))).replace(
"""
SELECT MAX(_airbyte_extracted_at)
FROM ${raw_table}
Expand All @@ -98,10 +117,10 @@ SELECT MAX(_airbyte_extracted_at)
// So we just need to get the timestamp of the most recent record.
if (loadedRecordTimestamp.isNull()) {
// Null timestamp because the table is empty. T+D can process the entire raw table during this sync.
return new InitialRawTableState(false, Optional.empty());
return new InitialRawTableStatus(true, false, Optional.empty());
} else {
// The raw table already has some records. T+D can skip all records with timestamp <= this value.
return new InitialRawTableState(false, Optional.of(loadedRecordTimestamp.getTimestampInstant()));
return new InitialRawTableStatus(true, false, Optional.of(loadedRecordTimestamp.getTimestampInstant()));
}
}

Expand Down Expand Up @@ -172,4 +191,133 @@ public void execute(final Sql sql) throws InterruptedException {
}
}

@Override
public List<DestinationInitialStatus<Impl>> gatherInitialState(List<StreamConfig> streamConfigs) throws Exception {
final List<DestinationInitialStatus<MinimumDestinationState.Impl>> initialStates = new ArrayList<>();
for (final StreamConfig streamConfig : streamConfigs) {
final StreamId id = streamConfig.id();
final Optional<TableDefinition> finalTable = findExistingTable(id);
final InitialRawTableStatus rawTableState = getInitialRawTableState(id);
initialStates.add(new DestinationInitialStatus<>(
streamConfig,
finalTable.isPresent(),
rawTableState,
finalTable.isPresent() && !existingSchemaMatchesStreamConfig(streamConfig, finalTable.get()),
finalTable.isEmpty() || isFinalTableEmpty(id),
// Return a default state blob since we don't actually track state.
new MinimumDestinationState.Impl(false)));
}
return initialStates;
}

@Override
public void commitDestinationStates(Map<StreamId, MinimumDestinationState.Impl> destinationStates) throws Exception {
// Intentionally do nothing. Bigquery doesn't actually support destination states.
}

private boolean existingSchemaMatchesStreamConfig(final StreamConfig stream,
final TableDefinition existingTable)
throws TableNotMigratedException {
final var alterTableReport = buildAlterTableReport(stream, existingTable);
boolean tableClusteringMatches = false;
boolean tablePartitioningMatches = false;
if (existingTable instanceof final StandardTableDefinition standardExistingTable) {
tableClusteringMatches = clusteringMatches(stream, standardExistingTable);
tablePartitioningMatches = partitioningMatches(standardExistingTable);
}
LOGGER.info("Alter Table Report {} {} {}; Clustering {}; Partitioning {}",
alterTableReport.columnsToAdd(),
alterTableReport.columnsToRemove(),
alterTableReport.columnsToChangeType(),
tableClusteringMatches,
tablePartitioningMatches);

return alterTableReport.isNoOp() && tableClusteringMatches && tablePartitioningMatches;
}

public AlterTableReport buildAlterTableReport(final StreamConfig stream, final TableDefinition existingTable) {
final Set<String> pks = getPks(stream);

final Map<String, StandardSQLTypeName> streamSchema = stream.columns().entrySet().stream()
.collect(toMap(
entry -> entry.getKey().name(),
entry -> toDialectType(entry.getValue())));

final Map<String, StandardSQLTypeName> existingSchema = existingTable.getSchema().getFields().stream()
.collect(toMap(
field -> field.getName(),
field -> field.getType().getStandardType()));

// Columns in the StreamConfig that don't exist in the TableDefinition
final Set<String> columnsToAdd = streamSchema.keySet().stream()
.filter(name -> !containsIgnoreCase(existingSchema.keySet(), name))
.collect(Collectors.toSet());

// Columns in the current schema that are no longer in the StreamConfig
final Set<String> columnsToRemove = existingSchema.keySet().stream()
.filter(name -> !containsIgnoreCase(streamSchema.keySet(), name) && !containsIgnoreCase(
JavaBaseConstants.V2_FINAL_TABLE_METADATA_COLUMNS, name))
.collect(Collectors.toSet());

// Columns that are typed differently than the StreamConfig
final Set<String> columnsToChangeType = Stream.concat(
streamSchema.keySet().stream()
// If it's not in the existing schema, it should already be in the columnsToAdd Set
.filter(name -> {
// Big Query Columns are case-insensitive, first find the correctly cased key if it exists
return matchingKey(existingSchema.keySet(), name)
// if it does exist, only include it in this set if the type (the value in each respective map)
// is different between the stream and existing schemas
.map(key -> !existingSchema.get(key).equals(streamSchema.get(name)))
// if there is no matching key, then don't include it because it is probably already in columnsToAdd
.orElse(false);
}),

// OR columns that used to have a non-null constraint and shouldn't
// (https://github.com/airbytehq/airbyte/pull/31082)
existingTable.getSchema().getFields().stream()
.filter(field -> pks.contains(field.getName()))
.filter(field -> field.getMode() == Field.Mode.REQUIRED)
.map(Field::getName))
.collect(Collectors.toSet());

final boolean isDestinationV2Format = schemaContainAllFinalTableV2AirbyteColumns(existingSchema.keySet());

return new AlterTableReport(columnsToAdd, columnsToRemove, columnsToChangeType, isDestinationV2Format);
}

@VisibleForTesting
public static boolean clusteringMatches(final StreamConfig stream, final StandardTableDefinition existingTable) {
return existingTable.getClustering() != null
&& containsAllIgnoreCase(
new HashSet<>(existingTable.getClustering().getFields()),
clusteringColumns(stream));
}

@VisibleForTesting
public static boolean partitioningMatches(final StandardTableDefinition existingTable) {
return existingTable.getTimePartitioning() != null
&& existingTable.getTimePartitioning()
.getField()
.equalsIgnoreCase("_airbyte_extracted_at")
&& TimePartitioning.Type.DAY.equals(existingTable.getTimePartitioning().getType());
}

/**
* Checks the schema to determine whether the table contains all expected final table airbyte
* columns
*
* @param columnNames the column names of the schema to check
* @return whether all the {@link JavaBaseConstants#V2_FINAL_TABLE_METADATA_COLUMNS} are present
*/
@VisibleForTesting
public static boolean schemaContainAllFinalTableV2AirbyteColumns(final Collection<String> columnNames) {
return JavaBaseConstants.V2_FINAL_TABLE_METADATA_COLUMNS.stream()
.allMatch(column -> containsIgnoreCase(columnNames, column));
}

private static Set<String> getPks(final StreamConfig stream) {
return stream.primaryKey() != null ? stream.primaryKey().stream().map(ColumnId::name).collect(Collectors.toSet()) : Collections.emptySet();
}

}
Loading

0 comments on commit 160ec72

Please sign in to comment.