-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: use JsonSchemaConverter to support JSON anyOf types #9130
Conversation
0e27371
to
31f4bab
Compare
@@ -232,6 +233,12 @@ private Object toKsqlValue( | |||
case STRING: | |||
// use String.valueOf to convert various int types and Boolean to string | |||
return String.valueOf(convertedValue); | |||
case BYTES: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, this looks reasonable, but it also looks like it could change things.
What all does this interact with?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought the same. I am not sure why this part didn't wrap the bytes to a ByteBuffer before if KSQL uses ByteBuffer for the BYTES column type. So far, only the AvroDataTranslator
was failing on this test, but I updated it. I also run some QTT tests and run all formats (avro, json, json-sr, proto, delimited) manually with simple, array, map and struct bytes and none of them failed. So I don't know if this code was just missed when supporting the BYTES type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this related to JsonSchemaConverter
? If not, I guess we can separate this out to a different PR. The reason is JsonSchemaConverter
changes are already complex and probably we don't want to mix with other fixes if not necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again, if this is not related to this PR, we can separate it out to another PR
class JsonSchemaTranslator implements ConnectSchemaTranslator { | ||
|
||
private final JsonSchemaData jsonData = new JsonSchemaData(); | ||
public class JsonSchemaTranslator implements ConnectSchemaTranslator { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any discussion we need to have about making this class public?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need this class for the QTT tests. Proto and Avro schema translators are also public for QTT.
ksqldb-serde/src/main/java/io/confluent/ksql/serde/json/KsqlJsonSerdeFactory.java
Outdated
Show resolved
Hide resolved
@@ -131,22 +130,16 @@ public class KsqlJsonDeserializerTest { | |||
.addSerializer(java.sql.Date.class, new EpochDaySerializer()) | |||
); | |||
|
|||
@Parameters(name = "{0}") | |||
public static Collection<Object[]> data() { | |||
return Arrays.asList(new Object[][]{{"Plain JSON", false}, {"Magic byte prefixed", true}}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, so do we still test both of these options/parameters?
If not, why?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not anymore. These test class was testing JSON and JSON_SR. The new JsonSchemaConverter does not use like 95% of these tests. I created a new KsqlJsonSchemaDeserializerTest
class for that. I will add tests on it the more I understand JSON_SR.
@@ -1115,11 +1126,7 @@ private byte[] serializeJson(final Object expected) { | |||
} | |||
|
|||
private byte[] addMagic(final byte[] json) { | |||
if (useSchemas) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, looks like we are testing only without schemas now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The majority of these tests do not apply anymore. The JsonSchemaConverter
does validations with the SR schema before serializing or deserializing. These tests are making sure those validations are working for plain JSON. It's hard to make tests work for JSON_SR, for instance, I cannot force a Connect schema with a JSON-SR schema to have a field that does not exist. The Connect schema object fails. So, I decided to remove any useSchemas
from this class and write my own on KsqlJsonSchemaDeserializerTest
.
d268661
to
53f56b1
Compare
@@ -87,7 +87,7 @@ | |||
"valueSchema" : { | |||
"type" : "object", | |||
"properties" : { | |||
"COL1" : { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This plan and the others related to key - inference
do not apply to JsonSchemaConverter anymore. The QTT were failing when comparing the JSON-SR schemas. However, I tried running the same test in Prod and things work fine.
I created a JSON-SR schema with a COL1 INTEGER
field:
Schema
{
"properties": {
"COL1": {
"description": "The integer type is used for integral numbers.",
"type": "integer"
}
},
"type": "object"
}
I then created a stream with a FOO INTEGER
using the above schema, then insert/select from the stream which it worked:
ksql> create stream json(foo INT) with (kafka_topic='json', value_format='JSON_SR');
Message
----------------
Stream created
----------------
ksql> insert into json(foo) values(1);
ksql> select * from json;
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|FOO |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1 |
Query Completed
I noticed the SR schema changed now to after inserting the values:
{
"properties": {
"FOO": {
"connect.index": 0,
"oneOf": [
{
"type": "null"
},
{
"connect.type": "int32",
"type": "integer"
}
]
}
},
"type": "object"
}
Perhaps the above changes (renaming the field) is compatible with JSON_SR only? I tried with Protobuf/Avro schemas and they do not allow the QTT test case to pass.
I modified the QTT plans to make QTT to work because Proto/Avro dot now allow such test and JSON_SR allowed me to do it in Production. I don't think there will be an incompatibility problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this mean after this change, insertion on existing json_sr won't work because of incompatible schema?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it because json_sr backward compatibility check is very loose?
{ "properties": { "FOO": { "connect.index": 0, "oneOf": [ { "type": "null" }, { "connect.type": "int32", "type": "integer" } ] } }, "type": "object" }
is compatible with
{ "properties": { "COL1": { "description": "The integer type is used for integral numbers.", "type": "integer" } }, "type": "object" }
How about you change FOO
's type to something else other than integer? cc @rayokota
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Synced offline, why col1
was there previously was a mystery but FOO
is correct now. @spena also test in prod and there's no issue of col1
. So ignore this...
} ] | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is another incompatibility issue with QTT.
The JSON_SR schema has an ARRAY field wrapped into another FOO
object which should be STRUCT<FOO ARRAY<STRING>>
. When creating a JSON-SR stream with WRAP_SINGLE_VALUE=false
, the internal array is unwrapped and used in the stream as FOO ARRAY<STRING>
only.
However, JSON_SR with JsonSchemaConverter does not work like that. JSON_SR supports anonymous fields by registering a schema without a field name. This makes SR API to understands the record is anonymous and does not have a field. When attempting to deserialize an anonymous record using the SR schema with the wrapped array, then the deserialization fails because the schema is not the one used during serialization. However, before this PR, JSON_SR deserialization was done using the JSON mapper (no JsonSchemaConverter) which ignores the SR schema. This deserialization was using the KSQL schema to deserialize and everything looked good. Running INSERT VALUES fails, though, even before the PR 'cause serialization uses the JsonSchemaConverter.
I found the above incompatibility was only on QTT, though. If I run the tests in Production, when I serialize a value with wrapped fields, the JsonSchemaConverter adds the SR schema ID in the record for future reference when deserializing it. When deserializing it with JsonSchemaConverter, the API will look at the right schema ID and deserialize it using that schema, then pass it to KSQL which will unwrap the field.
Seems the test case is compatible in Prod but not in QTT. This is another type behavior that QTT fails to validate corrrectly. I think we should fix QTT to use the Prod serializers instead of its own ones.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the old post schema is wrong. If wrap_single_valuer=false
, then FOO
shouldn't be in the schema. I guess the reason why FOO
is in SR in QTT previously is because the schema is registered by QTT manually? Now the ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/serde/json/ValueSpecJsonSchemaSerdeSupplier.java
you created will register the correct schema again during insert?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @spena
final Class<T> targetType | ||
final Class<T> targetType, | ||
final ConnectDataTranslator dataTranslator, | ||
final Converter converter | ||
) { | ||
return new KsqlJsonDeserializer<>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you return KsqlConnectDeserializer
for json_sr
case now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
final Object coerced; | ||
if (isJsonSchema) { | ||
final SchemaAndValue schemaAndValue = converter.toConnectData(topic, bytes); | ||
coerced = translator.toKsqlRow(schemaAndValue.schema(), schemaAndValue.value()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you return KsqlConnectDeserializer
in factory, these logic can be removed and you don't need converter
and translator
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -163,6 +168,9 @@ private static Converter getSchemaConverter( | |||
} | |||
config.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, DecimalFormat.NUMERIC.name()); | |||
|
|||
// Makes naming of unions consistent between all SR formats (i.e. connect_union_field_0) | |||
config.put(JsonSchemaDataConfig.GENERALIZED_SUM_TYPE_SUPPORT_CONFIG, true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious does this have any backward compatibility issues?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does https://github.com/confluentinc/ksql/blob/master/ksqldb-serde/src/main/java/io/confluent/ksql/serde/json/JsonSchemaTranslator.java#L48 also need this config? I think this is called when we first register the schema in SR for ct/cs
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I needed for the oneOf
support on JSON_SR. I removed for this PR to reduce some complexity. I will add more cases when testing multiple json schema events.
// Do not let the serializer to auto register a new schema on JSON_SR formats. | ||
// QTT tests with JSON_SR started failing on historic plans after supporting the | ||
// JsonSchemaConverter because the converter was auto registering a schema with extra | ||
// properties. These new props are not required, but was causing noise on the historic plans. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prod code when you insert still registers new schema for json_sr format, why we don't have problem there?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's caused by the historical plans with wrap singles = false. For instance, in 6.1, the SR schema used by the historical plan is FOO STRUCT<ARRAY<STRING>>
; but when the wrap_single_value=false is used, then KSQL will attempt to serialize the schema ARRAY<STRING>
which the SR API will try to register. SR will fail because the schema is not compatible with the latest schema that has a STRUCT.
All this is caused by QTT. I can explain you in more detail how QTT was working before offline. For QTT to work, I had to prevent QTT to register the schema as it was already registered before.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to update historic plans to include the extra fields instead of adding the special logic? Even though, I'm not sure why we don't use production code to do the serialization, but I guess we should better mimic what prod is doing...
|
||
// This makes JSON_SR output compatible with old QTT tests that used to deserialize | ||
// JSON_SR with a JSON object mapper instead of the ConnectSerdeSupplier. | ||
// The ConnectSerdeSupplier is adding the missing fields with null values even if they're |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
QQ: why the field is serialized to map if it's not in schema? I might miss something...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is what the ConnectSerdeSupplier is returning. I got lost on this part for some time. After debugging I found out that QTT creates a Map with the columns and its deserialized values. This is the part that is special for QTT.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So ConnectSerdeSupplier
deserialize map and creates more fields than what's in original schema? This doesn't sounds right, do you have examples?
|
||
// Then: | ||
assertThat(result, is(new BigDecimal("10.0"))); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should they be move to somewhere else instead of deleted?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I moved it to ValueSpecJsonSchemaSerdeSupplierTest
@@ -232,6 +233,12 @@ private Object toKsqlValue( | |||
case STRING: | |||
// use String.valueOf to convert various int types and Boolean to string | |||
return String.valueOf(convertedValue); | |||
case BYTES: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this related to JsonSchemaConverter
? If not, I guess we can separate this out to a different PR. The reason is JsonSchemaConverter
changes are already complex and probably we don't want to mix with other fixes if not necessary.
@@ -904,65 +896,6 @@ public void shouldThrowIfCanNotCoerceMapValue() { | |||
"Can't convert type. sourceType: BooleanNode, requiredType: INTEGER")))); | |||
} | |||
|
|||
@Test | |||
public void shouldThrowOnMapSchemaWithNonStringKeys() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These tests are available already in KsqlJsonSerdeFactoryTest.java
6f6b241
to
b6d3f28
Compare
@lihaosky I added a feature flag on the latest commit to disable the JsonSchemaConverte on deserialization in case users find an incompatible issue with JSON_SR. I reverted the KsqlJsonDeserializer changes to keep supporting both JSON and JSON_SR. It will be only called for JSON_SR when the feature is disabled. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @spena ! Looks good overall except some QTT questions.
|
||
// This makes JSON_SR output compatible with old QTT tests that used to deserialize | ||
// JSON_SR with a JSON object mapper instead of the ConnectSerdeSupplier. | ||
// The ConnectSerdeSupplier is adding the missing fields with null values even if they're |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So ConnectSerdeSupplier
deserialize map and creates more fields than what's in original schema? This doesn't sounds right, do you have examples?
import org.mockito.junit.MockitoJUnitRunner; | ||
|
||
@RunWith(MockitoJUnitRunner.class) | ||
public class ValueSpecJsonSchemaSerdeSupplierTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add test for the special logic deserializing map if that's needed? Even though I'm still not sure why that's needed...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed this code now. After changing the QTT historical plans in the previous reviews, I found out that I don't need this changes anymore.
@@ -87,7 +87,7 @@ | |||
"valueSchema" : { | |||
"type" : "object", | |||
"properties" : { | |||
"COL1" : { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it because json_sr backward compatibility check is very loose?
{ "properties": { "FOO": { "connect.index": 0, "oneOf": [ { "type": "null" }, { "connect.type": "int32", "type": "integer" } ] } }, "type": "object" }
is compatible with
{ "properties": { "COL1": { "description": "The integer type is used for integral numbers.", "type": "integer" } }, "type": "object" }
How about you change FOO
's type to something else other than integer? cc @rayokota
} ] | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the old post schema is wrong. If wrap_single_valuer=false
, then FOO
shouldn't be in the schema. I guess the reason why FOO
is in SR in QTT previously is because the schema is registered by QTT manually? Now the ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/serde/json/ValueSpecJsonSchemaSerdeSupplier.java
you created will register the correct schema again during insert?
@@ -232,6 +233,12 @@ private Object toKsqlValue( | |||
case STRING: | |||
// use String.valueOf to convert various int types and Boolean to string | |||
return String.valueOf(convertedValue); | |||
case BYTES: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again, if this is not related to this PR, we can separate it out to another PR
converter = new JsonSchemaConverter(schemaRegistryClient); | ||
converter.configure(ImmutableMap.of( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where's this converter
used?
} | ||
|
||
@Test | ||
public void shouldDeserializeJsonObjectCorrectly() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a test for validateSchema
failure?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you also test when KSQL_JSON_SR_CONVERTER_DESERIALIZER_ENABLED
is enabled or disabled?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The validateSchema
test was already in KsqlJsonSerdeFactoryTest
. I added the fflag test too.
final Struct result = deserializer.deserialize(SOME_TOPIC, bytes); | ||
|
||
// Then: | ||
assertThat(result, is(expectedOrder)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is expectedOrder
same with AN_ORDER
? Why not use AN_ORDER
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed to AN_ORDER
// Do not let the serializer to auto register a new schema on JSON_SR formats. | ||
// QTT tests with JSON_SR started failing on historic plans after supporting the | ||
// JsonSchemaConverter because the converter was auto registering a schema with extra | ||
// properties. These new props are not required, but was causing noise on the historic plans. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to update historic plans to include the extra fields instead of adding the special logic? Even though, I'm not sure why we don't use production code to do the serialization, but I guess we should better mimic what prod is doing...
443b89d
to
83f3aaf
Compare
@lihaosky I addressed all the comments in the latest 2 commits. The latest one has a bunch of QTT plans updated so I could remove the code from your comments. Th removal only included
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @spena ! LGTM overall just a few comments.
Can you also comment on https://github.com/confluentinc/ksql/pull/9130/files#r922437085? I don't fully understand that.
"connect.index": 0, | ||
"connect.type": "int64" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this compatible with schema without connect meta. A good test might be:
- Create stream with json_sr format in SR without this fix.
- Insert into the stream.
- With this fix, insert into stream again. Since insertion will try to register schema with connect metadata now, we can see if it's compatible with old schema.
I'm ok if you verify this manually.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I run it. The schema does not change. Seems this is another of those QTT special cases only. I registered the input
schema as specified in the elements.json
file:
{
"properties": {
"c1": {
"type": "integer"
}
},
"type": "object"
}
I then created the input stream, insert data, select data. Both cases with and without the fix. The SR schema didn't change. It is the same as it was registered (like the elements.json
). I created the output stream and the new schema is with the connect.type
which is expected.
I'm not sure what QTT is doing with historical plans. Why is expecting the connect.type
on the input topic. Maybe is registering the schema in SR and that adds the new property and then expects that? I need to debug more, but it is only for QTT. Prod works as expected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I figured out why QTT is behaving different. Before this fix, QTT was using the plain JSON serialiazer which does not handle SR schema. It just uses the one from the historical plan and serializes the record.
With the fix, I had to change QTT to use the JsonSchemaConvert (which is used in Prod). In Prod, when the schema ID is present, then it does not register the schema again, but in QTT we register it 'cause the ConnectSerdeSupplier
does not add the schema ID to the converter. That's what the commit I reverted used to do (add the schema ID to prevent register the schema).
So, I either re-revert the commit that used to do that so the historical plans are unchanged or we stick with these changes. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can stick to current code then. Adding logic in test framework in order to make test work seems not right...e ven test framework logic isn't what prod code is doing. cc @suhas-satish , @colinhicks
); | ||
} else { | ||
if (useSchemaRegistryFormat) { | ||
LOG.info("The JsonSchemaConverter for deserialization is disabled for JSON_SR."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove this log? It's possible this serde will be called many time, this log can spam log. I guess this can be deduced from the value of KSQL_JSON_SR_CONVERTER_DESERIALIZER_ENABLED
which might be logged as config.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed it. I was hesitating to add it in the first place. But you're right, we can deduct this by looking at the config.
83f3aaf
to
98dacb9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -87,7 +87,7 @@ | |||
"valueSchema" : { | |||
"type" : "object", | |||
"properties" : { | |||
"COL1" : { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Synced offline, why col1
was there previously was a mystery but FOO
is correct now. @spena also test in prod and there's no issue of col1
. So ignore this...
"connect.index": 0, | ||
"connect.type": "int64" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can stick to current code then. Adding logic in test framework in order to make test work seems not right...e ven test framework logic isn't what prod code is doing. cc @suhas-satish , @colinhicks
- return KsqlConnectDeserializer instead of KsqlJsonDeserializer - remove GENERALIZED_SUM_TYPE_SUPPORT_CONFIG for this PR (will do a follow-up PR) - move test from VAlueSpecJsonSerdeSupplierTest to ValueSpecJsonSchemaSerdeSupplierTest
- add KsqlJsonSerdeFactory tests with flag enabled & disabled - remove unnecessary Map removal fields from ValueSpecJsonSchemaSerdeSupplier - remove unnecessary code from KsqlJsonSchemaDeserializerTest
- remove unnecessary (perhaps spam) log
98dacb9
to
eb053ac
Compare
Description
Fixes #8526
The
JsonSchemaConverter
is currently used for JSON_SR serialization in KSQL. Plain JSON serdes are used for deserialization which does not support the use ofanyOf
types already supported by JSON in SR. This PR uses theJsonSchemaConverter
for deserialization too in order to allowanyOf
types.The reason of not using
JsonSchemaConverter
for deserialization before was because an issue with decimals which are already fixed in SR.A feature flag
"ksql.json_sr.converter.deserializer.enabled
is added to KSQL and set totrue
by default. There's no expected compatibility issues, but if users present problems with the new deserializer, then they can set the flag tofalse
in order to continue working with their queries.Testing done
Describe the testing strategy. Unit and integration tests are expected for any behavior changes.
Reviewer checklist