Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: push Connect schema down #6200

Merged
merged 5 commits into from
Sep 16, 2020

Conversation

big-andy-coates
Copy link
Contributor

@big-andy-coates big-andy-coates commented Sep 14, 2020

Description

Following on from the fixes and refactors done in #6188, this commit pushes down the use of the Connect schema type to lower levels of the code. Higher levels of the code now deal with LogicalSchema, PersistentSchema or just a List<SimpleColumn>.

This moves us closer to removing the Connect schema from the code base, except in the Serde code that deals with connect formats.

LogicalSchema and PersistentSchema no longer know about the Connect schema type. Calls to retrieve the Connect schema from these types have been replaced with a util function that can convert a list of columns into a Struct Connect schema. As more code moves away from the Connect schema these util function calls will slowly be removed.

A good sign this is an improvement is that this PR deletes more code than it adds, and now error messages use SQL types, not connect types.

Testing done

usual

Reviewer checklist

  • Ensure docs are updated if necessary. (eg. if a user visible feature is being added or changed).
  • Ensure relevant issues are linked (description should include text like "Fixes #")

This commit fixes several issues and refactors a lot of the serde code around wrapping and unwrapping single values.

The main issues being fixed are:
  1. allow each format to define if it supported wrapping and/or unwrapping. (Not possible with current design)
  2. pass the correct wrapping / unwrapping flags are passed to key vs value formats when creating serde. (bug in code passes same SerdeOptions to key and value).
  3. register the correct wrapped / unwrapped schema with the SR. (bug in existing code meant registered format is always wrapped).

At the same time, the way wrapping / unwrapping was handled in the code wasn't great. Formats like `JSON` needed to be able to handle both wrapped and unwrapped schemas and values, depending on whether the user _explicitly_ set wrapping or unwrapping, vs the default behaviour of the format. This commit refactors the code such that the format will always be passed the a consistent schema and the set of serde features the format should use when creating the serde. This simplifies things and paves the way to user-define-serde.
Following on from the fixes and refactors done in confluentinc#6188, this commit pushes down the use of the Connect schema type to lower levels of the code. Higher levels of the code now deal with `LogicalSchema`, `PersistentSchema` or just a `List<SimpleColumn>`.

This moves us closer to removing the Connect schema from the code base, except in the Serde code that deals with connect formats.

`LogicalSchema` and `PersistentSchema` no longer know about the Connect schema type. Calls to retrieve the Connect schema from these types have been replaced with a util function that can convert a list of columns into a Struct Connect schema.  As more code moves away from the Connect schema these util function calls will slowly be removed.
@big-andy-coates big-andy-coates requested a review from a team as a code owner September 14, 2020 23:14
Copy link
Contributor

@agavra agavra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! there's a perf consideration that I flagged for going forward, but it's only related to your change in that it might be harder to fix after your change than before

final LogicalSchema logicalSchema,
final SqlValueCoercer sqlValueCoercer
) {
final ConnectSchema keySchema = ConnectSchemas.columnsToConnectSchema(logicalSchema.key());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this was true before your change, but we should consider how this design fares with regards to performance. we should not be creating a schema per-record (as I think we are here) - the older code would have been easier to "cache" the result inside the data source

Copy link
Contributor Author

@big-andy-coates big-andy-coates Sep 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI: This is currently only used for inserts.

Totally agree we should be thinking about perf here. However, as the plan is to move away from using Connect's Struct type for keys, this conversion is only temporary.

Regardless, I'll pull the conversion higher up so its cached in my next PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@big-andy-coates big-andy-coates merged commit c57bdb1 into confluentinc:master Sep 16, 2020
@big-andy-coates big-andy-coates deleted the wrapping_2 branch September 16, 2020 08:37
big-andy-coates added a commit to big-andy-coates/ksql that referenced this pull request Sep 16, 2020
Following on from confluentinc#6200, this change changes the `Format` interface so that the created `Serde` is `Serde<List<?>>` rather than the previous `Serde<Struct>`, i.e. the format returns serde capable of returning a list of values, rather than requiring those values in a Connect `Struct`.

This is part of the work of moving away from using Connect's `Struct` type internally.

All the 'ConnectFormat' sub-classes still internally using `Serde<Struct>` for now, as that's what the Connect code returns.  This change is not trying to fix this.

`DELIMITED` and `KAFKA` formats no longer know about any `Connect` types, i.e. no `Struct` and no `Schema`. Yay.

In the engine the key is still passed around as a `Struct`. This change is not trying to fix this.  So there is code to convert the returned `List<?>` into the key `Struct`.  However, this will go once the key is no longer a `Struct`, and its only a single, primitive field, so the overhead is very low, and the `KAFKA` format was doing this already internally anyway, (so no change in perf).

Of course, any one value in the returned `List<?>` can still be a Connect `Struct` if the column type is a struct. This change is not trying to fix this.  This should be fixed later.

Serde performance for JSON and AVRO remains the same, (no benchmark available for other formats yet).

Previous benchmark results (ran locally):

```
Benchmark                   (schemaName)  (serializationFormat)  Mode  Cnt  Score   Error  Units
SerdeBenchmark.deserialize   impressions                   JSON  avgt    3  1.148 ± 0.205  us/op
SerdeBenchmark.deserialize   impressions                   Avro  avgt    3  2.056 ± 1.141  us/op
SerdeBenchmark.deserialize       metrics                   JSON  avgt    3  5.504 ± 1.535  us/op
SerdeBenchmark.deserialize       metrics                   Avro  avgt    3  7.564 ± 3.445  us/op
SerdeBenchmark.serialize     impressions                   JSON  avgt    3  0.556 ± 0.085  us/op
SerdeBenchmark.serialize     impressions                   Avro  avgt    3  1.420 ± 3.746  us/op
SerdeBenchmark.serialize         metrics                   JSON  avgt    3  2.909 ± 1.162  us/op
SerdeBenchmark.serialize         metrics                   Avro  avgt    3  5.076 ± 0.383  us/op
```

Latest benchmark results (no statistically significant change):

```
Benchmark                   (schemaName)  (serializationFormat)  Mode  Cnt  Score   Error  Units
SerdeBenchmark.deserialize   impressions                   JSON  avgt    3  1.191 ± 0.520  us/op
SerdeBenchmark.deserialize   impressions                   Avro  avgt    3  2.032 ± 0.512  us/op
SerdeBenchmark.deserialize       metrics                   JSON  avgt    3  5.324 ± 1.457  us/op
SerdeBenchmark.deserialize       metrics                   Avro  avgt    3  7.604 ± 2.389  us/op
SerdeBenchmark.serialize     impressions                   JSON  avgt    3  0.589 ± 0.027  us/op
SerdeBenchmark.serialize     impressions                   Avro  avgt    3  1.307 ± 0.147  us/op
SerdeBenchmark.serialize         metrics                   JSON  avgt    3  2.752 ± 0.583  us/op
SerdeBenchmark.serialize         metrics                   Avro  avgt    3  5.005 ± 0.780  us/op
```
big-andy-coates added a commit that referenced this pull request Sep 21, 2020
Following on from #6200, this change changes the `Format` interface so that the created `Serde` is `Serde<List<?>>` rather than the previous `Serde<Struct>`, i.e. the format returns serde capable of returning a list of values, rather than requiring those values in a Connect `Struct`.

This is part of the work of moving away from using Connect's `Struct` type internally.

All the 'ConnectFormat' sub-classes still internally using `Serde<Struct>` for now, as that's what the Connect code returns.  This change is not trying to fix this.

`DELIMITED` and `KAFKA` formats no longer know about any `Connect` types, i.e. no `Struct` and no `Schema`. Yay.

In the engine the key is still passed around as a `Struct`. This change is not trying to fix this.  So there is code to convert the returned `List<?>` into the key `Struct`.  However, this will go once the key is no longer a `Struct`, and its only a single, primitive field, so the overhead is very low, and the `KAFKA` format was doing this already internally anyway, (so no change in perf).

Of course, any one value in the returned `List<?>` can still be a Connect `Struct` if the column type is a struct. This change is not trying to fix this.  This should be fixed later.

Serde performance for JSON and AVRO remains the same, (no benchmark available for other formats yet).

Previous benchmark results (ran locally):

```
Benchmark                   (schemaName)  (serializationFormat)  Mode  Cnt  Score   Error  Units
SerdeBenchmark.deserialize   impressions                   JSON  avgt    3  1.148 ± 0.205  us/op
SerdeBenchmark.deserialize   impressions                   Avro  avgt    3  2.056 ± 1.141  us/op
SerdeBenchmark.deserialize       metrics                   JSON  avgt    3  5.504 ± 1.535  us/op
SerdeBenchmark.deserialize       metrics                   Avro  avgt    3  7.564 ± 3.445  us/op
SerdeBenchmark.serialize     impressions                   JSON  avgt    3  0.556 ± 0.085  us/op
SerdeBenchmark.serialize     impressions                   Avro  avgt    3  1.420 ± 3.746  us/op
SerdeBenchmark.serialize         metrics                   JSON  avgt    3  2.909 ± 1.162  us/op
SerdeBenchmark.serialize         metrics                   Avro  avgt    3  5.076 ± 0.383  us/op
```

Latest benchmark results (no statistically significant change):

```
Benchmark                   (schemaName)  (serializationFormat)  Mode  Cnt  Score   Error  Units
SerdeBenchmark.deserialize   impressions                   JSON  avgt    3  1.191 ± 0.520  us/op
SerdeBenchmark.deserialize   impressions                   Avro  avgt    3  2.032 ± 0.512  us/op
SerdeBenchmark.deserialize       metrics                   JSON  avgt    3  5.324 ± 1.457  us/op
SerdeBenchmark.deserialize       metrics                   Avro  avgt    3  7.604 ± 2.389  us/op
SerdeBenchmark.serialize     impressions                   JSON  avgt    3  0.589 ± 0.027  us/op
SerdeBenchmark.serialize     impressions                   Avro  avgt    3  1.307 ± 0.147  us/op
SerdeBenchmark.serialize         metrics                   JSON  avgt    3  2.752 ± 0.583  us/op
SerdeBenchmark.serialize         metrics                   Avro  avgt    3  5.005 ± 0.780  us/op
```

Co-authored-by: Andy Coates <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants