-
Notifications
You must be signed in to change notification settings - Fork 191
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Generalize schema retriever #291
Generalize schema retriever #291
Conversation
// SMT RegexTransformation replaces the topic with <Dataset>=<TableName> | ||
String dataset = record.topic().split("=")[0]; | ||
String tableName = record.topic().split("=")[1]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there heavy demand for dynamic routing to different datasets? An alternative here could be to allow the user to configure a single static dataset in the connector and, if multiple datasets are desired, use multiple connector instances.
Or, for something in the middle, we can allow the user to specify a static dataset as a "default" in case the record topic doesn't use the <dataset>=<table>
syntax.
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This sounds good to me
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with @C0urante. This part feels a little over-engineered to me. Some notes:
- Allow users to set a default dataset.
- Multiple identical string splits are wasteful (slow) operations. Just split once.
- Nit:
=
as a divider is confusing to me for dataset/table splitter. Something like:
seems better--it's used for splitting Java classpaths, for example.:
is a reserved char in both Kafka topics and BQ dataset/tables (I think), so should be safe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will update accordingly
// Dynamic update shall not require connector restart and shall compute table id in runtime. | ||
if (!topicsToBaseTableIds.containsKey(record.topic())) { | ||
TopicToTableResolver.updateTopicToTable(config, record.topic(), topicsToBaseTableIds); | ||
private void ensureExistingTable(TableId table) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could rename this to maybeEnsureExistingTable
and then do the check for config.getBoolean(config.TABLE_CREATE_CONFIG)
here instead of putting that burden on the caller.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Will make this change.
BucketInfo bucketInfo = BucketInfo.of(bucketName); | ||
bucket = gcs.create(bucketInfo); | ||
} | ||
else throw new ConfigException("Bucket does not exist. Set "+ config.AUTO_CREATE_BUCKET_CONFIG + " to true"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion:
else throw new ConfigException("Bucket does not exist. Set "+ config.AUTO_CREATE_BUCKET_CONFIG + " to true"); | |
else throw new ConnectException("Bucket '" + bucketName + "' does not exist; create the bucket manually, or set '" + config.AUTO_CREATE_BUCKET_CONFIG + "' to true."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: wrap all blocks in braces...
else {
}
Nit: " + ...
(add space before plus)
for (SinkRecord record: records) { | ||
Schema kafkaValueSchema = schemaRetriever.retrieveValueSchema(record); | ||
Schema kafkaKeySchema = kafkaKeyFieldName.isPresent() ? schemaRetriever.retrieveKeySchema(record) : null; | ||
tableDescription = (kafkaValueSchema.doc() != null) ? kafkaValueSchema.doc() : tableDescription; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: don't need the parentheses here
tableDescription = (kafkaValueSchema.doc() != null) ? kafkaValueSchema.doc() : tableDescription; | |
tableDescription = kafkaValueSchema.doc() != null ? kafkaValueSchema.doc() : tableDescription; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not quite sure how to think about this, but in cases where sink records don't all have the same docstring, the last one will be used. Not something I think we need to worry about, but wanted to call it out.
public static final Boolean AUTO_CREATE_BUCKET_DEFAULT = true; | ||
private static final ConfigDef.Importance AUTO_CREATE_BUCKET_IMPORTANCE = ConfigDef.Importance.MEDIUM; | ||
private static final String AUTO_CREATE_BUCKET_DOC = | ||
"Whether to automatically create the given bucket, if it does not exist"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This only applies when GCS batch loading is enabled, right? If so, can we briefly clarify that here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes! Will update the doc.
public static final Boolean ADD_NEW_BQ_FIELDS_DEFAULT = false; | ||
private static final ConfigDef.Importance ADD_NEW_BQ_FIELDS_IMPORTANCE = ConfigDef.Importance.MEDIUM; | ||
private static final String ADD_NEW_BQ_FIELDS_DOC = | ||
"If true, new fields can be added to BigQuery tables during subsequent schema updates" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is this different from the existing schema update behavior?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should eliminate the existing autoUpdateSchemas
and just have autoCreateBucket
and addNewBigQueryFields
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bike shedding, but I think allowNewBigQueryFields
is more explicit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, gotcha. So really we're replacing autoUpdateSchemas
with a combination of addNewBigQueryFields
and changeRequiredFieldsToNullable
, right? That should cover the only two schema evolutions based on the BigQuery docs.
Will the unionization logic be configurable? Think we might want to make it toggle-able and even off by default, since it's a bit of a footgun if you accidentally send even just one bad record to the connector (a bunch of unnecessary columns will get added to your table and BigQuery doesn't allow deletion of columns, so you're pretty much stuck with them unless you want to delete and then recreate the table).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, agree. It should be configurable, and off by default.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm. Thought on this a bit more. I believe the update logic is configurable and off by default now.
We have allowNewBigQueryFields
and allowBigQueryRequiredFieldRelaxation
now. These both default to false. I think these effectively covers the unionization logic, since the if is:
if (!currentFields.containsKey(entry.getKey())) {
if (allowNewBQFields && (entry.getValue().getMode().equals(Field.Mode.NULLABLE)
|| (entry.getValue().getMode().equals(Field.Mode.REQUIRED) && allowBQRequiredFieldRelaxation))) {
...
} else {
if (currentFields.get(entry.getKey()).getMode().equals(Field.Mode.REQUIRED) && newFields.get(entry.getKey()).getMode().equals(Field.Mode.NULLABLE)) {
if (allowBQRequiredFieldRelaxation) {
currentFields.put(entry.getKey(), entry.getValue().toBuilder().setMode(Field.Mode.NULLABLE).build());
...
Since both default to false, schemas won't get updated by default.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@criccomini @stoynov96 @sahithi03 sorry to dredge this up again, but I think we might want to reconsider some of the logic here.
The prior behavior of the connector was to basically send a single record's schema to BigQuery and let validation happen there; the only permitted operations were (and still are) adding new columns to a table, and relaxing existing columns from REQUIRED
to NULLABLE
. This meant that it was possible to relax required fields to nullable, but only if there was a corresponding upstream schema change.
The new behavior of the connector still catches this case, but also automatically relaxes REQUIRED
fields in the existing table schema to NULLABLE
if they're missing from the most recent upstream schema.
This is risky, since it means that a single misplaced record with a completely disjoint schema from the existing table schema can cause permanent modifications to be made to the BigQuery table schema. Granted, this would require allowNewBQFields
and allowBQRequiredFieldRelaxation
to both be set to true
, but it's not unreasonable for people to want to enable both with the expectation that they would cause the connector to act in the same way as it would have with autoUpdateSchemas
.
I think we might still want to add a third config property, allowSchemaUnionization
, that toggles the schema unionization behavior. If it's set to false
and both allowNewBQFields
and allowBQRequiredFieldRelaxation
are set to true
, then the prior behavior of the autoUpdateSchemas
property should be preserved effectively for users who still want that.
WDYT?
@@ -0,0 +1,28 @@ | |||
package com.wepay.kafka.connect.bigquery.retrieve; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't forget the copyright header :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(On all new files)
transforms=RegexTransformation | ||
transforms.RegexTransformation.type=org.apache.kafka.connect.transforms.RegexRouter | ||
transforms.RegexTransformation.regex=.* | ||
transforms.RegexTransformation.replacement=$0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's this supposed to do? AFAICT it's going to grab the entire topic (matched by .*
), and then replace it with the entire match ($0
)... so will this have any effect on the records here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, this will not have any effect on the records. The regex was just meant to be an example/placeholder :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, gotcha. In that case, can we do one of the following (no string preference on my part for/against any of them):
- Add a comment explaining what this is and how someone might alter it to achieve table routing behavior
- Remove it
- Alter it to provide a more practical example (such as appending or stripping a prefix or suffix, or redirecting all records from one topic to a different topic but leaving all others unaffected)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I vote for:
Alter it to provide a more practical example (such as appending or stripping a prefix or suffix, or redirecting all records from one topic to a different topic but leaving all others unaffected)
...egration-test/java/com/wepay/kafka/connect/bigquery/it/BigQueryConnectorIntegrationTest.java
Show resolved
Hide resolved
@@ -176,8 +173,15 @@ private Object convertField(Field fieldSchema, FieldValue field) { | |||
List<Object> result = new ArrayList<>(); | |||
assert (rowSchema.size() == row.size()); | |||
|
|||
for (int i=0; i < rowSchema.size(); i++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: i = 0
with spaces
Bucket bucket = gcs.get(bucketName); | ||
if (bucket != null) { | ||
logger.info("Deleting objects in the Bucket {}", bucketName); | ||
for (Blob blob: bucket.list().iterateAll()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: blob :
(+ space)
@@ -176,8 +173,15 @@ private Object convertField(Field fieldSchema, FieldValue field) { | |||
List<Object> result = new ArrayList<>(); | |||
assert (rowSchema.size() == row.size()); | |||
|
|||
for (int i=0; i < rowSchema.size(); i++) { | |||
if (rowSchema.get(i).getName().equals("row")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm confused. Why is this done twice, once with !
and then the subsequent loop without?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Theschema unionization
logic creates fields in the BigQuery table which might not follow the same order as the fields in the schema.json
file. The integration tests compare the actual and expected rows based on testRow.get(0
) which is supposed to be the “row”
field. But since the fields would now be shuffled, I tried to add the “row”
field values first in the testRow
list to make sure that it is the first value. Then in the subsequent loop skipped the "row" field values, so that these are not entered twice in the list. But I would want to know if there's a better way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure how generalized indexOf() can be in java, can we use this to make this cleaner?
Edit: looks like we can't specify a comparator for it actually
...ector/src/integration-test/java/com/wepay/kafka/connect/bigquery/it/utils/BucketClearer.java
Show resolved
Hide resolved
private RecordConverter<Map<String, Object>> recordConverter; | ||
|
||
public SinkRecordConverter(BigQuerySinkTaskConfig config) { | ||
this.config = config; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would prefer this class takes recordConverter, boolean sanitizeFieldName, (optional) keyName, (optional) dataName. This makes the class cleaner to mock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will update accordingly.
import java.util.ArrayList; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.*; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wildcard again
@@ -100,11 +98,11 @@ public GCSToBQWriter(Storage storage, | |||
* @param blobName the name of the GCS blob to write. | |||
* @throws InterruptedException if interrupted. | |||
*/ | |||
public void writeRows(List<RowToInsert> rows, | |||
// writeRows -> needs to be changed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oops, left this here by mistake. Will remove it.
schemaManager.createTable(tableId, topic); | ||
logger.info("Table {} does not exist, auto-created table for topic {}", tableId, topic); | ||
schemaManager.createTable(tableId, records); | ||
logger.info("Table {} does not exist, auto-created table ", tableId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log message should come before schemaManager.createTable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will update it.
import java.util.ArrayList; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.*; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wildcard
import java.util.Map; | ||
import java.util.Random; | ||
import java.util.Set; | ||
import java.util.*; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wildcard
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.*; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wildcard
try { | ||
while (currentIndex < rows.size()) { | ||
List<RowToInsert> currentBatch = | ||
rows.subList(currentIndex, Math.min(currentIndex + currentBatchSize, rows.size())); | ||
List<Map.Entry<SinkRecord,RowToInsert>> currentBatchList = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: SinkRecord, RowToInsert
(space)
...nnector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/AdaptiveBigQueryWriter.java
Show resolved
Hide resolved
Codecov Report
@@ Coverage Diff @@
## generalization-feature #291 +/- ##
============================================================
- Coverage 70.87% 66.10% -4.77%
+ Complexity 301 267 -34
============================================================
Files 32 32
Lines 1538 1484 -54
Branches 164 152 -12
============================================================
- Hits 1090 981 -109
- Misses 390 450 +60
+ Partials 58 53 -5 |
|
||
bufferSize=100000 | ||
maxWriteSize=10000 | ||
tableWriteWait=1000 | ||
|
||
transforms=RegexTransformation | ||
transforms.RegexTransformation.type=org.apache.kafka.connect.transforms.RegexRouter | ||
# .* is a placeholder for the actual regex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe rephrase as something like "A placeholder regex router SMT that does nothing. Replace with relevant transformation SMT"
########################################### Fill me in! ########################################### | ||
# The name of the BigQuery project to write to | ||
project= | ||
# The name of the BigQuery dataset to write to (leave the '.*=' at the beginning, enter your | ||
# dataset after it) | ||
datasets=.*= | ||
# The location of a BigQuery service account JSON key file | ||
keyfile= | ||
keyfile= |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: newline at the end of the file
import java.util.List; | ||
import java.util.Properties; | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: redundant new line
@@ -176,8 +173,15 @@ private Object convertField(Field fieldSchema, FieldValue field) { | |||
List<Object> result = new ArrayList<>(); | |||
assert (rowSchema.size() == row.size()); | |||
|
|||
for (int i=0; i < rowSchema.size(); i++) { | |||
if (rowSchema.get(i).getName().equals("row")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure how generalized indexOf() can be in java, can we use this to make this cleaner?
Edit: looks like we can't specify a comparator for it actually
private void maybeEnsureExistingTable(TableId table) { | ||
BigQuery bigQuery = getBigQuery(); | ||
if (bigQuery.getTable(table) == null && !config.getBoolean(config.TABLE_CREATE_CONFIG)) { | ||
logger.warn("You may want to enable auto table creation by setting {}=true in the properties file", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we append this warning to the exception message instead?
tableName = smtReplacement[0]; | ||
} else { | ||
throw new ConfigException("Incorrect regex replacement format. " + | ||
"SMT replacement should either follow <dataset>:<tableName> format or replace by <tableName> only."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This message could be a bit unclear. I think it'd be clearer like
SMT replacement should either produce the <dataset>:<tableName> format or just the <tablename> format
.
// package private for testing. | ||
TableInfo constructTableInfo(TableId table, Schema kafkaKeySchema, Schema kafkaValueSchema) { | ||
com.google.cloud.bigquery.Schema bigQuerySchema = getBigQuerySchema(kafkaKeySchema, kafkaValueSchema); | ||
private TableInfo getTableInfo(TableId table, Set<SinkRecord> records) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add some Javadocs on these new private methods? I think they're sufficiently complicated to warrant some docs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. Will update.
public static final Boolean ADD_NEW_BQ_FIELDS_DEFAULT = false; | ||
private static final ConfigDef.Importance ADD_NEW_BQ_FIELDS_IMPORTANCE = ConfigDef.Importance.MEDIUM; | ||
private static final String ADD_NEW_BQ_FIELDS_DOC = | ||
"If true, new fields can be added to BigQuery tables during subsequent schema updates" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm. Thought on this a bit more. I believe the update logic is configurable and off by default now.
We have allowNewBigQueryFields
and allowBigQueryRequiredFieldRelaxation
now. These both default to false. I think these effectively covers the unionization logic, since the if is:
if (!currentFields.containsKey(entry.getKey())) {
if (allowNewBQFields && (entry.getValue().getMode().equals(Field.Mode.NULLABLE)
|| (entry.getValue().getMode().equals(Field.Mode.REQUIRED) && allowBQRequiredFieldRelaxation))) {
...
} else {
if (currentFields.get(entry.getKey()).getMode().equals(Field.Mode.REQUIRED) && newFields.get(entry.getKey()).getMode().equals(Field.Mode.NULLABLE)) {
if (allowBQRequiredFieldRelaxation) {
currentFields.put(entry.getKey(), entry.getValue().toBuilder().setMode(Field.Mode.NULLABLE).build());
...
Since both default to false, schemas won't get updated by default.
@@ -39,6 +41,6 @@ transforms.RegexTransformation.replacement=$0 | |||
project= | |||
# The name of the BigQuery dataset to write to (leave the '.*=' at the beginning, enter your |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment seems to be outdated now. We should update it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will update
* Change Bucket Clearer * Add RegexRouter Transform * Change topicToTableResolver Methods * Change EnsureExistingTables Method * Change SchemaRegistrySchemaRetriever * Add RecordNameStrategy * Change standalone properties * Delete TopicToTableResolver and TopicToTableResolverTest class * Generalise schemaRetriever interface * Change schemaRegistrySchemaRetriever * Change TableClearer Class to clear all tables in a Dataset * Add IdentitySchemaRetriever Class * Change BigQuerySinkTask * Remove TOPIC_TO_TABLES config from BigQuerySinkConfig class * Modify BigQuerySinkConnectorTest and BigQuerySinkTaskTest * Delete SchemaRegistrySchemaRetriever Class * Delete MemorySchemaRetriever Class * Change Writer Classes * Modify bigQuerySinkTaskTest * Change SchemaManagerTest Class * Modify SMT Co-authored-by: Sahithi Reddy Velma <[email protected]>
Changes made according to the Github issue - #245