Skip to content

Commit

Permalink
GH-27: Add allowSchemaUnionization config property (#28)
Browse files Browse the repository at this point in the history
* GH-27: Add allowSchemaUnionization config property

Still needed: unit and possibly integration tests for the logic in the
SchemaManager class

* GH-27: Tweak schema change validation logic

* GH-27: Fix schema update bugs, add unit tests
  • Loading branch information
C0urante authored Sep 23, 2020
1 parent c300894 commit 51ae613
Show file tree
Hide file tree
Showing 7 changed files with 411 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -316,8 +316,9 @@ private SchemaManager newSchemaManager() {
Optional<List<String>> clusteringFieldName = config.getClusteringPartitionFieldName();
boolean allowNewBQFields = config.getBoolean(config.ALLOW_NEW_BIGQUERY_FIELDS_CONFIG);
boolean allowReqFieldRelaxation = config.getBoolean(config.ALLOW_BIGQUERY_REQUIRED_FIELD_RELAXATION_CONFIG);
boolean allowSchemaUnionization = config.getBoolean(config.ALLOW_SCHEMA_UNIONIZATION_CONFIG);
return new SchemaManager(schemaRetriever, schemaConverter, getBigQuery(),
allowNewBQFields, allowReqFieldRelaxation,
allowNewBQFields, allowReqFieldRelaxation, allowSchemaUnionization,
kafkaKeyFieldName, kafkaDataFieldName,
timestampPartitionFieldName, clusteringFieldName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.LegacySQLTypeName;
import com.google.cloud.bigquery.StandardTableDefinition;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.bigquery.TimePartitioning;
import com.google.cloud.bigquery.TimePartitioning.Type;
import com.google.common.annotations.VisibleForTesting;
import com.wepay.kafka.connect.bigquery.api.SchemaRetriever;
import com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig;
import com.wepay.kafka.connect.bigquery.convert.KafkaDataBuilder;
Expand All @@ -28,10 +28,6 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

Expand All @@ -47,6 +43,7 @@ public class SchemaManager {
private final BigQuery bigQuery;
private final boolean allowNewBQFields;
private final boolean allowBQRequiredFieldRelaxation;
private final boolean allowSchemaUnionization;
private final Optional<String> kafkaKeyFieldName;
private final Optional<String> kafkaDataFieldName;
private final Optional<String> timestampPartitionFieldName;
Expand All @@ -63,6 +60,7 @@ public class SchemaManager {
* @param bigQuery Used to communicate create/update requests to BigQuery.
* @param allowNewBQFields If set to true, allows new fields to be added to BigQuery Schema.
* @param allowBQRequiredFieldRelaxation If set to true, allows changing field mode from REQUIRED to NULLABLE
* @param allowSchemaUnionization If set to true, allows existing and new schemas to be unionized
* @param kafkaKeyFieldName The name of kafka key field to be used in BigQuery.
* If set to null, Kafka Key Field will not be included in BigQuery.
* @param kafkaDataFieldName The name of kafka data field to be used in BigQuery.
Expand All @@ -79,6 +77,7 @@ public SchemaManager(
BigQuery bigQuery,
boolean allowNewBQFields,
boolean allowBQRequiredFieldRelaxation,
boolean allowSchemaUnionization,
Optional<String> kafkaKeyFieldName,
Optional<String> kafkaDataFieldName,
Optional<String> timestampPartitionFieldName,
Expand All @@ -89,6 +88,7 @@ public SchemaManager(
bigQuery,
allowNewBQFields,
allowBQRequiredFieldRelaxation,
allowSchemaUnionization,
kafkaKeyFieldName,
kafkaDataFieldName,
timestampPartitionFieldName,
Expand All @@ -105,6 +105,7 @@ private SchemaManager(
BigQuery bigQuery,
boolean allowNewBQFields,
boolean allowBQRequiredFieldRelaxation,
boolean allowSchemaUnionization,
Optional<String> kafkaKeyFieldName,
Optional<String> kafkaDataFieldName,
Optional<String> timestampPartitionFieldName,
Expand All @@ -118,6 +119,7 @@ private SchemaManager(
this.bigQuery = bigQuery;
this.allowNewBQFields = allowNewBQFields;
this.allowBQRequiredFieldRelaxation = allowBQRequiredFieldRelaxation;
this.allowSchemaUnionization = allowSchemaUnionization;
this.kafkaKeyFieldName = kafkaKeyFieldName;
this.kafkaDataFieldName = kafkaDataFieldName;
this.timestampPartitionFieldName = timestampPartitionFieldName;
Expand All @@ -135,6 +137,7 @@ public SchemaManager forIntermediateTables() {
bigQuery,
allowNewBQFields,
allowBQRequiredFieldRelaxation,
allowSchemaUnionization,
kafkaKeyFieldName,
kafkaDataFieldName,
timestampPartitionFieldName,
Expand Down Expand Up @@ -163,7 +166,7 @@ public com.google.cloud.bigquery.Schema cachedSchema(TableId table) {
* @param table The BigQuery table to create,
* @param records The sink records used to determine the schema.
*/
public void createOrUpdateTable(TableId table, Set<SinkRecord> records) {
public void createOrUpdateTable(TableId table, List<SinkRecord> records) {
synchronized (lock(tableCreateLocks, table)) {
if (bigQuery.getTable(table) == null) {
logger.debug("{} doesn't exist; creating instead of updating", table(table));
Expand All @@ -184,7 +187,7 @@ public void createOrUpdateTable(TableId table, Set<SinkRecord> records) {
* @param records The sink records used to determine the schema.
* @return whether the table had to be created; if the table already existed, will return false
*/
public boolean createTable(TableId table, Set<SinkRecord> records) {
public boolean createTable(TableId table, List<SinkRecord> records) {
synchronized (lock(tableCreateLocks, table)) {
if (schemaCache.containsKey(table)) {
// Table already exists; noop
Expand Down Expand Up @@ -215,7 +218,7 @@ public boolean createTable(TableId table, Set<SinkRecord> records) {
* @param table The BigQuery table to update.
* @param records The sink records used to update the schema.
*/
public void updateSchema(TableId table, Set<SinkRecord> records) {
public void updateSchema(TableId table, List<SinkRecord> records) {
synchronized (lock(tableUpdateLocks, table)) {
TableInfo tableInfo = getTableInfo(table, records);
if (!schemaCache.containsKey(table)) {
Expand All @@ -232,7 +235,6 @@ public void updateSchema(TableId table, Set<SinkRecord> records) {
logger.debug("Skipping update of {} since current schema should be compatible", table(table));
}
}

}

/**
Expand All @@ -241,17 +243,36 @@ public void updateSchema(TableId table, Set<SinkRecord> records) {
* @param records The sink records used to determine the schema for constructing the table info
* @return The resulting BigQuery table information
*/
private TableInfo getTableInfo(TableId table, Set<SinkRecord> records) {
List<com.google.cloud.bigquery.Schema> bigQuerySchemas = getSchemasList(table, records);
com.google.cloud.bigquery.Schema schema;
private TableInfo getTableInfo(TableId table, List<SinkRecord> records) {
com.google.cloud.bigquery.Schema proposedSchema;
String tableDescription;
try {
schema = getUnionizedSchema(bigQuerySchemas);
proposedSchema = getAndValidateProposedSchema(table, records);
tableDescription = getUnionizedTableDescription(records);
} catch (BigQueryConnectException exception) {
throw new BigQueryConnectException("Failed to unionize schemas of records for the table " + table, exception);
}
return constructTableInfo(table, schema, tableDescription);
return constructTableInfo(table, proposedSchema, tableDescription);
}

@VisibleForTesting
com.google.cloud.bigquery.Schema getAndValidateProposedSchema(
TableId table, List<SinkRecord> records) {
com.google.cloud.bigquery.Schema result;
if (allowSchemaUnionization) {
List<com.google.cloud.bigquery.Schema> bigQuerySchemas = getSchemasList(table, records);
result = getUnionizedSchema(bigQuerySchemas);
} else {
com.google.cloud.bigquery.Schema existingSchema = readTableSchema(table);
result = convertRecordSchema(records.get(records.size() - 1));
if (existingSchema != null) {
validateSchemaChange(existingSchema, result);
if (allowBQRequiredFieldRelaxation) {
result = relaxFieldsWhereNecessary(existingSchema, result);
}
}
}
return result;
}

/**
Expand All @@ -260,30 +281,34 @@ private TableInfo getTableInfo(TableId table, Set<SinkRecord> records) {
* @param records The sink records' schemas to add to the list of schemas
* @return List of BigQuery schemas
*/
private List<com.google.cloud.bigquery.Schema> getSchemasList(TableId table, Set<SinkRecord> records) {
private List<com.google.cloud.bigquery.Schema> getSchemasList(TableId table, List<SinkRecord> records) {
List<com.google.cloud.bigquery.Schema> bigQuerySchemas = new ArrayList<>();
if (bigQuery.getTable(table) != null) {
Table bigQueryTable = bigQuery.getTable(table.getDataset(), table.getTable());
bigQuerySchemas.add(bigQueryTable.getDefinition().getSchema());
}
Optional.ofNullable(readTableSchema(table)).ifPresent(bigQuerySchemas::add);
for (SinkRecord record : records) {
Schema kafkaValueSchema = schemaRetriever.retrieveValueSchema(record);
Schema kafkaKeySchema = kafkaKeyFieldName.isPresent() ? schemaRetriever.retrieveKeySchema(record) : null;
com.google.cloud.bigquery.Schema schema = getBigQuerySchema(kafkaKeySchema, kafkaValueSchema);
bigQuerySchemas.add(schema);
bigQuerySchemas.add(convertRecordSchema(record));
}
return bigQuerySchemas;
}

private com.google.cloud.bigquery.Schema convertRecordSchema(SinkRecord record) {
Schema kafkaValueSchema = schemaRetriever.retrieveValueSchema(record);
Schema kafkaKeySchema = kafkaKeyFieldName.isPresent() ? schemaRetriever.retrieveKeySchema(record) : null;
com.google.cloud.bigquery.Schema result = getBigQuerySchema(kafkaKeySchema, kafkaValueSchema);
return result;
}

/**
* Returns a unionized schema from a list of BigQuery schemas
* @param schemas The list of BigQuery schemas to unionize
* @return The resulting unionized BigQuery schema
*/
private com.google.cloud.bigquery.Schema getUnionizedSchema(List<com.google.cloud.bigquery.Schema> schemas) {
com.google.cloud.bigquery.Schema currentSchema = schemas.get(0);
com.google.cloud.bigquery.Schema proposedSchema;
for (int i = 1; i < schemas.size(); i++) {
currentSchema = unionizeSchemas(currentSchema, schemas.get(i));
proposedSchema = unionizeSchemas(currentSchema, schemas.get(i));
validateSchemaChange(currentSchema, proposedSchema);
currentSchema = proposedSchema;
}
return currentSchema;
}
Expand All @@ -294,39 +319,86 @@ private com.google.cloud.bigquery.Schema getUnionizedSchema(List<com.google.clou
* @param secondSchema The second BigQuery schema to unionize
* @return The resulting unionized BigQuery schema
*/
private com.google.cloud.bigquery.Schema unionizeSchemas(com.google.cloud.bigquery.Schema firstSchema, com.google.cloud.bigquery.Schema secondSchema) {
private com.google.cloud.bigquery.Schema unionizeSchemas(
com.google.cloud.bigquery.Schema firstSchema, com.google.cloud.bigquery.Schema secondSchema) {
Map<String, Field> firstSchemaFields = schemaFields(firstSchema);
Map<String, Field> secondSchemaFields = schemaFields(secondSchema);
for (Map.Entry<String, Field> entry : secondSchemaFields.entrySet()) {
if (!firstSchemaFields.containsKey(entry.getKey())) {
if (allowNewBQFields && (entry.getValue().getMode().equals(Field.Mode.NULLABLE)
|| (entry.getValue().getMode().equals(Field.Mode.REQUIRED) && allowBQRequiredFieldRelaxation))) {
firstSchemaFields.put(entry.getKey(), entry.getValue().toBuilder().setMode(Field.Mode.NULLABLE).build());
} else {
Map<String, Field> unionizedSchemaFields = new LinkedHashMap<>();

firstSchemaFields.forEach((name, firstField) -> {
Field secondField = secondSchemaFields.get(name);
if (secondField == null) {
unionizedSchemaFields.put(name, firstField.toBuilder().setMode(Field.Mode.NULLABLE).build());
} else if (isFieldRelaxation(firstField, secondField)) {
unionizedSchemaFields.put(name, secondField);
} else {
unionizedSchemaFields.put(name, firstField);
}
});

secondSchemaFields.forEach((name, secondField) -> {
if (!unionizedSchemaFields.containsKey(name)) {
unionizedSchemaFields.put(name, secondField.toBuilder().setMode(Field.Mode.NULLABLE).build());
}
});
return com.google.cloud.bigquery.Schema.of(unionizedSchemaFields.values());
}

private void validateSchemaChange(
com.google.cloud.bigquery.Schema existingSchema, com.google.cloud.bigquery.Schema proposedSchema) {
Map<String, Field> earliestSchemaFields = schemaFields(existingSchema);
Map<String, Field> proposedSchemaFields = schemaFields(proposedSchema);
for (Map.Entry<String, Field> entry : proposedSchemaFields.entrySet()) {
if (!earliestSchemaFields.containsKey(entry.getKey())) {
if (!isValidFieldAddition(entry.getValue())) {
throw new BigQueryConnectException("New Field found with the name " + entry.getKey()
+ " Ensure that " + BigQuerySinkConfig.ALLOW_NEW_BIGQUERY_FIELDS_CONFIG + " is true and " + BigQuerySinkConfig.ALLOW_BIGQUERY_REQUIRED_FIELD_RELAXATION_CONFIG +
" is true if " + entry.getKey() + " has mode REQUIRED in order to update the Schema");
+ " Ensure that " + BigQuerySinkConfig.ALLOW_NEW_BIGQUERY_FIELDS_CONFIG + " is true and "
+ BigQuerySinkConfig.ALLOW_BIGQUERY_REQUIRED_FIELD_RELAXATION_CONFIG +
" is true if " + entry.getKey() + " has mode REQUIRED in order to update the Schema");
}
} else {
if (firstSchemaFields.get(entry.getKey()).getMode().equals(Field.Mode.REQUIRED) && secondSchemaFields.get(entry.getKey()).getMode().equals(Field.Mode.NULLABLE)) {
if (allowBQRequiredFieldRelaxation) {
firstSchemaFields.put(entry.getKey(), entry.getValue().toBuilder().setMode(Field.Mode.NULLABLE).build());
} else {
throw new BigQueryConnectException( entry.getKey() + " has mode REQUIRED. Set " + BigQuerySinkConfig.ALLOW_BIGQUERY_REQUIRED_FIELD_RELAXATION_CONFIG
+ " to true, to change the mode to NULLABLE");
}
} else if (isFieldRelaxation(earliestSchemaFields.get(entry.getKey()), entry.getValue())) {
if (!allowBQRequiredFieldRelaxation) {
throw new BigQueryConnectException( entry.getKey() + " has mode REQUIRED. Set "
+ BigQuerySinkConfig.ALLOW_BIGQUERY_REQUIRED_FIELD_RELAXATION_CONFIG
+ " to true, to change the mode to NULLABLE");
}
}
}
return com.google.cloud.bigquery.Schema.of(firstSchemaFields.values());
}

private boolean isFieldRelaxation(Field currentField, Field proposedField) {
return currentField.getMode().equals(Field.Mode.REQUIRED)
&& proposedField.getMode().equals(Field.Mode.NULLABLE);
}

private boolean isValidFieldAddition(Field newField) {
return allowNewBQFields && (
newField.getMode().equals(Field.Mode.NULLABLE) ||
(newField.getMode().equals(Field.Mode.REQUIRED) && allowBQRequiredFieldRelaxation));
}

private com.google.cloud.bigquery.Schema relaxFieldsWhereNecessary(
com.google.cloud.bigquery.Schema existingSchema,
com.google.cloud.bigquery.Schema proposedSchema) {
Map<String, Field> existingSchemaFields = schemaFields(existingSchema);
Map<String, Field> proposedSchemaFields = schemaFields(proposedSchema);
List<Field> newSchemaFields = new ArrayList<>();
for (Map.Entry<String, Field> entry : proposedSchemaFields.entrySet()) {
if (!existingSchemaFields.containsKey(entry.getKey())) {
newSchemaFields.add(entry.getValue().toBuilder().setMode(Field.Mode.NULLABLE).build());
} else {
newSchemaFields.add(entry.getValue());
}
}
return com.google.cloud.bigquery.Schema.of(newSchemaFields);
}

/**
* Returns a unionized table description from a set of sink records going to the same BigQuery table.
* @param records The records used to get the unionized table description
* @return The resulting table description
*/
private String getUnionizedTableDescription(Set<SinkRecord> records) {
private String getUnionizedTableDescription(List<SinkRecord> records) {
String tableDescription = null;
for (SinkRecord record : records) {
Schema kafkaValueSchema = schemaRetriever.retrieveValueSchema(record);
Expand Down Expand Up @@ -470,7 +542,9 @@ private String table(TableId table) {

private com.google.cloud.bigquery.Schema readTableSchema(TableId table) {
logger.trace("Reading schema for {}", table(table));
return bigQuery.getTable(table).getDefinition().getSchema();
return Optional.ofNullable(bigQuery.getTable(table))
.map(t -> t.getDefinition().getSchema())
.orElse(null);
}

private Object lock(ConcurrentMap<TableId, Object> locks, TableId table) {
Expand Down
Loading

0 comments on commit 51ae613

Please sign in to comment.