Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decouple topics from tables and generalize SchemaRetriever #245

Open
criccomini opened this issue Feb 4, 2020 · 8 comments
Open

Decouple topics from tables and generalize SchemaRetriever #245

criccomini opened this issue Feb 4, 2020 · 8 comments

Comments

@criccomini
Copy link
Contributor

criccomini commented Feb 4, 2020

Intro

We have had a number of PRs and issues recently that are attempting to do two things:

  1. Support more serialization formats
  2. Support multiple schemas in the same topic

These include:

#238
#216
#175
#206
#178

And more.

Changes

I believe that we can address these issues with the following changes:

Add TableRouter

  1. Adding a pluggable TableRouter that takes a SinkRecord and returns which table it should be written to. (Default: RegexTableRouter)
public interface TableRouter {
  void configure(Map<String, String> properties);
  TableId getTable(SinkRecord sinkRecord);
}

Generalize SchemaRetriever

  1. Changing SchemaRetriever interface to have two methods: Schema getKeySchema(SinkRecord) and Schema getValueSchema(SinkRecord). (Default: IdentitySchemaRetriever, which just returns sinkRecord.keySchema() and sinkRecord.valueSchema())
public interface SchemaRetriever {
  void configure(Map<String, String> properties);
  Schema retrieveKeySchema(SinkRecord sinkRecord);
  Schema retrieveValueSchema(SinkRecord sinkRecord);
}

Change *QueryWriter schema update logic

  1. Changing the way that schema updates are handled in the AdaptiveBigQueryWriter.

This change will be the largest. If we remove SchemaRetriever.updateSchema, we need a way for AdaptiveBigQueryWriter to update BQ schemas when a batch insert fails. Given these rules:

https://cloud.google.com/bigquery/docs/managing-table-schemas

This document describes how to modify the schema definitions for existing BigQuery tables. BigQuery natively supports the following schema modifications:

  • Adding columns to a schema definition
  • Relaxing a column's mode from REQUIRED to NULLABLE
  • It is valid to create a table without defining an initial schema and to add a schema definition to the table at a later time.

All other schema modifications are unsupported and require manual workarounds, including:

  • Changing a column's name
  • Changing a column's data type
  • Changing a column's mode (aside from relaxing REQUIRED columns to NULLABLE)
  • Deleting a column

The correct behavior is to do when a schema failure occurs is to have the adaptive writer union all fields from all insert batch with all fields in the existing BigQuery table. An example illustrates:

Existing BQ Schema

a					INTEGER				REQUIRED
b 					INTEGER				NULLABLE

Batch Insert Schemas

Row 1
a					INTEGER 			NULLABLE
Row 2
c					INTEGER				NULLABLE
Row 3
a					INTEGER				REQUIRED

Final Schema
a					INTEGER				NULLABLE
b 					INTEGER				NULLABLE
c					INTEGER				NULLABLE

This will require that we have access to the insert batch's the SinkRecord for each row (not just RowToInsert). It will also require that AdaptiveBigQueryWriter has the SchemaRetriever wired in as well.

I think the most straight-forward way to handle this is to have TableWriterBuilder.addRow(SinkRecord record) instead of RowToInsert. The Builder can then keep a SortedMap<SinkRecord, RowToInsert>, and pass that down the stack through to AdaptiveBigQueryWriter.performWriteRequest. AdaptiveBigQueryWriter.attemptSchemaUpdate can then be changed to implement the logic I field-union logic that I described above.

One area that we'll have to be particularly careful about is dealing with repeated records and nested structures. They need to be properly unioned as well.

Benefits

This approach should give us a ton of flexibility including:

  • It will allow you to pick which table to route each individual message based on all of the information in SinkRecord (topic, key schema, value schema, message payload, etc.)
  • It will allow us to fix some known schema-evolution bugs in cases where one field is added and another is dropped.
  • It will allow us to use SinkRecord's .keySchema and .valueSchema rather than talking to the schema registry for schemas.
  • It will make it easy to support JSON messages, even those that return null for the .keySchema() and .valueSchema() methods--you can implement a custom retriever for this case.
@mkubala
Copy link
Contributor

mkubala commented Feb 5, 2020

I'm really glad to read about pluggable TableRouter! This will address a lot of issues with API-implementation separation.
Is this still a draft or somebody is already working on it?
I'm going to rebase #238 as soon as the new interface arrives :)

@criccomini
Copy link
Contributor Author

@mkubala no one is working on this at the moment. Our hope was that you would introduce the TableRouter as part of your PR.

@sebco59
Copy link

sebco59 commented May 4, 2020

@criccomini Do you have any update on this issue ? Is someone working on it ?
In our side, we need multi schemas support (#238) and we will glad to help on it

@sebco59
Copy link

sebco59 commented May 6, 2020

@criccomini Do you have any update on this issue ? Is someone working on it ?
In our side, we need multi schemas support (#238) and we will glad to help on it

@bingqinzhou @mtagle Do you have any information ?

@criccomini
Copy link
Contributor Author

@sebco59 we have someone beginning this now! :)

@sebco59
Copy link

sebco59 commented Jun 3, 2020

@criccomini Nice !

On our side we tried to use Simple Message Transformation to deal with MultiSchema instead of Table Router implementation.

But SMT doesn't work because of the way to retrieve schema on SchemaRegistrySchemaRetriever#getSubject. If we change it, it will open the door for all SMT and no need to implement TableRouter

  • With SMT, we could extract a lot of complexity from connector to outside
    • we want to deal with multi subject -> SMT to change topic name in the fly
    • we want to add kafka system information (offset, partition, ...p) -> SMT to add field with this value on the fly
    • we want to route topic in a specific bq table depending on a field -> SMT to change topic name with a regex
    • we want to add header of the record to bq record -> SMT to add field... and so more
  • With SMT, we have default error handling from kafka connect.
  • With SMT, we could reuse it in another connector

We have started developing schema retriever logic but we tried to decouple it from the schema update logic. With the current implementation, the BQ table schema updates are entirely based on the latest version available in the schema registry, which doesn't do well with messages with an older incompatible schema.
With the schema retriever, it's still straightforward to handle homogenous batches (as far as the schema is concerned), you can simply use any message from the batch to try to update the BQ table schema.
To handle heterogenous batches, we could rely on a retry logic that will incrementally update the BQ table schema until it gets stable (you can use the first failed message in the batch to try to update the BQ table schema).
That would be a simple alternative before the field-union logic is implemented.

What do you think of this approach (SMT and Schema Update Logic) ?
Can we contribute too ?

Update:
You can found our quick n dirty dev: sebco59/kafka-connect-bigquery -> branch allow-smt

@C0urante
Copy link
Collaborator

@criccomini This seems like a great abstraction to place over the connector and allow it to service a variety of use cases. Excited to see this thing still growing and evolving as the years go by :)

I think @sebco59's suggestion to rely on SMTs is a valuable one. A lot of the problems that are being addressed here aren't specific to BigQuery as an external system and can apply to connectors in general. An SMT is a great way to write code once and reuse it anywhere, and also should help keep the code base for this connector uncluttered. Specifically, I think users should either use the RegexRouter SMT or implement their own instead of the suggested TableRouter interface. Being able to modify the topic of a SinkRecord via an SMT should be enough to give arbitrary control over which table it ends up being routed to.

With regards to the comment about modifying SchemaRegistrySchemaRetriever#getSubject to be able to handle topics with heterogeneous schemas, I think there's some good news and some bad news. Good news: there's already a pluggable interface with a few out-of-the-box implementations to handle this use case, the subject name strategy interface. We could allow users of the SchemaRegistrySchemaRetriever to specify a subject.name.strategy to use, which could default to the TopicNameStrategy (which mirrors the current behavior of the retriever). Bad news: this doesn't address the possibility of heterogenous topics for any other SchemaRetriever implementation. Because of this, I think we should still expand the API to rely on a SinkRecord (and possibly also a TableId, just to best preserve existing behavior), although it'd probably also be nice to add support for user-specified subject name strategies to the SchemaRegistrySchemaRetriever.

I also think that if we evolve the SchemaRetriever API, we should be careful to add default implementations for the new methods so that SchemaRetriever instances developed against the current API will still work normally against newer versions of the connector.

On the subject of automatic schema unionization, I'm wondering about a potential gotcha: if the connector reads a record with a completely mismatched schema that was not meant to be in the topic to begin with, will automatic unionization of the table still happen? I think it will as long as any fields present in that record's schema which aren't already in the table are optional. We probably want to be cautious about accidentally adding columns to a table schema, since deleting them isn't permitted

@criccomini
Copy link
Contributor Author

(We are using SMT approach now. PR is forthcoming :) )

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants