Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support numeric json serde for decimals #3588

Merged
merged 2 commits into from
Oct 24, 2019

Conversation

agavra
Copy link
Contributor

@agavra agavra commented Oct 16, 2019

Description

There really are only 3 pretty small changes in this PR:

  • change KsqlJsonDeserializer to circumvent the JsonConverter for deserialization. We were enforcing all the types anyway, which walks the entire tree, so I changed it to not do that. This also helps with decimals because JsonConverter cannot deserialize to decimal without a supplied schema and JsonConverter only allows for schemas in the form of a wrapped payload.
  • change KsqlJsonDeserializer to be able to deserialize columns with byte schemas (see enforceValidBytes), and make sure that we deserialize with BigDecimal for floating point values
  • change KsqlJsonSerializer to use the config that serializes using numeric types

The rest is the multitude of test framework changes that I needed to do to test this PR:

  • ValueSpec was removed, replaced by ExpectedRecordComparator
  • Record contains the original JSON that specified it and that is used to compare the actual value instead of going through an extra Serde
  • I added avroToJson in the test utils so that the SchemaTranslationTest, which generates avro records, can also generate the spec from it. In the future I plan on removing the avroToValueSpec method in that class but that is a massive refactor that wasn't working out in my favor 😢

Testing done

  • QTT test
  • Unit tests
  • Local testing
ksql> CREATE STREAM DECIMALS (val DECIMAL(20,19)) WITH (kafka_topic='decimals', value_format='json', partitions=1);

 Message
----------------
 Stream created
----------------

ksql> INSERT INTO DECIMALS (val) VALUES ('1.1234512345123451234');

ksql> SET 'auto.offset.reset'='earliest';
Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.

ksql> SELECT * FROM DECIMALS EMIT CHANGES;
+------------------------------+------------------------------+------------------------------+
|ROWTIME                       |ROWKEY                        |VAL                           |
+------------------------------+------------------------------+------------------------------+
|1571184431963                 |null                          |1.1234512345123451234         |
^CQuery terminated


ksql> PRINT 'decimals' FROM BEGINNING;
Format:JSON
{"ROWTIME":1571184431963,"ROWKEY":"null","VAL":1.1234512345123451234}
^CTopic printing ceased

ksql> CREATE STREAM copy AS SELECT * FROM decimals;

 Message
------------------------------------------------------------------------------
 Stream COPY created and running. Created by query with query ID: CSAS_COPY_2
------------------------------------------------------------------------------

ksql> PRINT COPY FROM BEGINNING;
Format:JSON
{"ROWTIME":1571184431963,"ROWKEY":"null","VAL":1.1234512345123451234}
^CTopic printing ceased

Reviewer checklist

  • Ensure docs are updated if necessary. (eg. if a user visible feature is being added or changed).
  • Ensure relevant issues are linked (description should include text like "Fixes #")

@agavra agavra requested a review from a team as a code owner October 16, 2019 00:32
Copy link
Contributor

@purplefox purplefox left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, just a couple of comments/questions

return context.deserializer.gson.toJson(context.val);
if (context.val instanceof ObjectNode) {
try {
// this ensure sorted order, there's an issue with Jackson where just enabling
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain why sorted order is necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is necessary because some of our testing asserts the output of a serialized JSON string. Specifically, if you look at simple-struct.json we specify a stream:

CREATE STREAM orders (... itemid VARCHAR, ...

But when we produce to it, we produce JSON structs:

            "ITEMID": {
              "ITEMID": 6,
              "CATEGORY": {
                "ID": 2,
                "NAME": "Food"
              },
              "NAME": "Item_6"
            },

Then when we check to see that we have the right value, we check for:

            "ITEMID": "{\"CATEGORY\":{\"ID\":2,\"NAME\":\"Food\"},\"ITEMID\":6,\"NAME\":\"Item_6\"}",

Note that this is ordered alphabetically, not in the order it was produced. Something I did (probably bypassing JsonConverter) made us no longer sort it, so I added that back for consistency.

Copy link
Contributor

@purplefox purplefox Oct 17, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would worry about adding sorting (sorting is potentially a slow operation) in Serde code which is potentially performance sensitive. Or putting any code in production code that's only there to satisfy test code. Is there no way to sort the output in the test before you assert it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me dig into it a little more to see where the sorting was being done, but my intention was less to satisfy the test and more to maintain backwards compatibility. If someone was depending on this behavior, we can't just remove it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok makes sense. But JSON objects have no defined order (as per spec), so if anyone is relying on that order, I'd consider that a bug in their code, unless we have specifically documented that we sort our JSON alphabetically. The reason I say that is we don't want to be shackled with the burden of having to sort it for ever, which could cause a real problem when we want to optimise our serde code to get the best performance :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need the sorted deserialization? It looks like you updated the tests to not assume sorted order (?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not update the tests (which I guess is confusing, because they don't show up 😂 ) - if we want to do this is should be in a separate PR (#3664)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm with Tim here - we shouldn't be paying the cost in our production code just to satisfy the tests. We should look to remove this with all haste!

I've tagged the above issue as 6.0, though I think that's the latest we should fix it.

@agavra agavra requested a review from a team October 16, 2019 17:02
},
{
"name": "validate int elements OK",
"format": ["KAFKA"],
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I split out KAFKA and DELIMITED formats because they serialize slightly differently in tests

@@ -201,14 +201,14 @@
"inputs": [
{"topic": "test_topic", "key": 0, "value": {"value":[1.0, 1.0]}},
{"topic": "test_topic", "key": 0, "value": {"value":[2.0, 2.0]}},
{"topic": "test_topic", "key": 0, "value": {"value":[9223372036854775807.0, 1.0]}},
{"topic": "test_topic", "key": 0, "value": {"value":[922337203685.0, 1.0]}},
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test previously was incorrect - JSON serialized it as a long, which can fit the old value, but since it's specified a as a double this number actually has higher precision than a double can hold

@agavra agavra requested a review from purplefox October 23, 2019 16:58
return context.deserializer.gson.toJson(context.val);
if (context.val instanceof ObjectNode) {
try {
// this ensure sorted order, there's an issue with Jackson where just enabling
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need the sorted deserialization? It looks like you updated the tests to not assume sorted order (?)

private static Map<String, JsonNode> upperCaseKeys(final ObjectNode map) {
final Map<String, JsonNode> result = new HashMap<>(map.size());
for (Iterator<Entry<String, JsonNode>> it = map.fields(); it.hasNext(); ) {
final Entry<String, JsonNode> entry = it.next();
// what happens if we have two fields with the same name and different case?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, what does happen? ;) Maybe we should have a test for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a little confusing to think about... but I think this is what happens:

  • If there's an exact case-sensitive match, this does nothing because the value isn't used
  • If there's only one case-insensitive match and no case-sensitive match (e.g. schema foo VARCHAR value {"foo": "bar"} then it maps field foo to "bar".
  • If there's more than one case-insensitive match and no case-sensitive match for either (e.g. schema foo VARCHAR and value {"foo": "bar", "fOO": "baz"}) then the behavior is undefined - it will choose the latter of the two

I think this behavior is probably OK. It's unfortunate that JSON allows case sensitive field names because we have two options:

  1. We force case sensitive names for KSQL when using JSON. This is crappy user experience for the 99% of use cases that don't hit this issue
  2. We accept this limitation, but have the workaround that you can specify case sensitive names if you need to.
  3. We don't accept the case-insensitive fields when it's ambiguous (throw an error) but accept it in all other scenarios. The workaround in 2 is still available.

I prefer the third because it makes the simple things simple and the complicated things possible, but it's backwards incompatible technically - I think it might be OK because we'd be changing a super buggy behavior. I'll create a separate PR for the test and change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • We force case sensitive names for KSQL when using JSON. This is crappy user experience for the 99% of use cases that don't hit this issue

That's not going to float ;)

  • We accept this limitation, but have the workaround that you can specify case sensitive names if you need to.

Surely, this falls under the remit of the quoted identifier work? As part of that it's probably worth checking that if a user supplies a quoted column name that we pick the right one.

  • We don't accept the case-insensitive fields when it's ambiguous (throw an error) but accept it in all other scenarios. The workaround in 2 is still available.

This is not really an option. We don't know what the data looks like when the user is providing us with the schema. What's more, the data can evolve and change over time. So maybe the first record we see only has foo in it, but the next record has foo and Foo.

Also, while I'm a fan of 'fail early and fail hard' for many things, I don't think throwing an error here is the right option. If an upstream system suddenly starts publishing a second case-variant of one of the fields you're using, you'll be mighty pissed if KSQL suddenly starts rejecting all the records.

I don't think there is a perfect answer here. But I'd probably think about going with the approach that:

  • if the user supplies a quoted column name, then only that case should match. (Same for avro).
  • If the user supplies an unquoted column name, then it should match one of the fields. Which field should be deterministic, e.g. the last encountered (as it is now), or maybe first/last given some other sort order - though I'm not sure what a different sort order would give us above 'the last one we encounter'.

Does our recent work on quoted identifiers get us anywhere close to these two points?

@agavra agavra merged commit 8621594 into confluentinc:master Oct 24, 2019
@agavra agavra deleted the numeric_json branch October 24, 2019 21:41
Copy link
Contributor

@big-andy-coates big-andy-coates left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @agavra

Sorry for the late review, but there's a few issues I've listed below.

Also, I'm wondering if we still support Connect's funky decimals or not? If so, it probably needs documenting somewhere in the .. erm... documentation! :D

Great to see this done pre-release 🥇

return false;
}

expected.isBigDecimal();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does this line do?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

woops! I used IntelliJ "remove unused variable" and it kept the side-effect...

final Optional<Long> timestamp,
final WindowData window
) {
this.topic = topic;
this.key = key;
this.value = value;
this.jsonValue = jsonValue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: null check

return context.deserializer.gson.toJson(context.val);
if (context.val instanceof ObjectNode) {
try {
// this ensure sorted order, there's an issue with Jackson where just enabling
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm with Tim here - we shouldn't be paying the cost in our production code just to satisfy the tests. We should look to remove this with all haste!

I've tagged the above issue as 6.0, though I think that's the latest we should fix it.


for (Field ksqlField : context.schema.fields()) {
// the "case insensitive" strategy leverages that all KSQL fields are internally
// case sensitive - if they were specified without quotes, then they are upper-cased
// during parsing. any ksql fields that are case insensitive, therefore, will be matched
// in this case insensitive field map without modification but the quoted fields will not
// (unless they were all uppercase to start off with, which is expected to match)
final Object fieldValue = ObjectUtils.defaultIfNull(
final JsonNode fieldValue = ObjectUtils.defaultIfNull(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

side note:

Shame we pay the price of the lookup into upperCasedFields even if jsonFields.get(ksqlField.name()) returns non-null.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 good catch

private static Map<String, JsonNode> upperCaseKeys(final ObjectNode map) {
final Map<String, JsonNode> result = new HashMap<>(map.size());
for (Iterator<Entry<String, JsonNode>> it = map.fields(); it.hasNext(); ) {
final Entry<String, JsonNode> entry = it.next();
// what happens if we have two fields with the same name and different case?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • We force case sensitive names for KSQL when using JSON. This is crappy user experience for the 99% of use cases that don't hit this issue

That's not going to float ;)

  • We accept this limitation, but have the workaround that you can specify case sensitive names if you need to.

Surely, this falls under the remit of the quoted identifier work? As part of that it's probably worth checking that if a user supplies a quoted column name that we pick the right one.

  • We don't accept the case-insensitive fields when it's ambiguous (throw an error) but accept it in all other scenarios. The workaround in 2 is still available.

This is not really an option. We don't know what the data looks like when the user is providing us with the schema. What's more, the data can evolve and change over time. So maybe the first record we see only has foo in it, but the next record has foo and Foo.

Also, while I'm a fan of 'fail early and fail hard' for many things, I don't think throwing an error here is the right option. If an upstream system suddenly starts publishing a second case-variant of one of the fields you're using, you'll be mighty pissed if KSQL suddenly starts rejecting all the records.

I don't think there is a perfect answer here. But I'd probably think about going with the approach that:

  • if the user supplies a quoted column name, then only that case should match. (Same for avro).
  • If the user supplies an unquoted column name, then it should match one of the fields. Which field should be deterministic, e.g. the last encountered (as it is now), or maybe first/last given some other sort order - though I'm not sure what a different sort order would give us above 'the last one we encounter'.

Does our recent work on quoted identifiers get us anywhere close to these two points?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants