Skip to content

Commit

Permalink
fix: use JsonSchemaConverter to support JSON anyOf types (#9130)
Browse files Browse the repository at this point in the history
  • Loading branch information
spena authored Jul 21, 2022
1 parent 5770294 commit 798c12d
Show file tree
Hide file tree
Showing 60 changed files with 628 additions and 372 deletions.
19 changes: 19 additions & 0 deletions ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,18 @@ public class KsqlConfig extends AbstractConfig {
+ "that contains headers columns will work with the headers functionality to prevent "
+ "a degraded command topic situation when restarting ksqlDB.";

public static final String KSQL_JSON_SR_CONVERTER_DESERIALIZER_ENABLED
= "ksql.json_sr.converter.deserializer.enabled";

private static final Boolean KSQL_JSON_SR_CONVERTER_DESERIALIZER_ENABLED_DEFAULT = true;

private static final String KSQL_JSON_SR_CONVERTER_DESERIALIZER_ENABLED_DOC = ""
+ "Feature flag that enables the use of the JsonSchemaConverter class for deserializing "
+ "JSON_SR records. JsonSchemaConverter is required to support `anyOf` JSON_SR types. "
+ "This flag should be used to disable this feature only when users experience "
+ "deserialization issues caused by the JsonSchemaConverter. Otherwise, this flag should "
+ "remain true to take advantage of the new `anyOf` types and other JSON_SR serde fixes.";

public static final String KSQL_SOURCE_TABLE_MATERIALIZATION_ENABLED =
"ksql.source.table.materialization.enabled";
private static final Boolean KSQL_SOURCE_TABLE_MATERIALIZATION_ENABLED_DEFAULT = true;
Expand Down Expand Up @@ -1470,6 +1482,13 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
Importance.LOW,
KSQL_WEBSOCKET_CONNECTION_MAX_TIMEOUT_MS_DOC
)
.define(
KSQL_JSON_SR_CONVERTER_DESERIALIZER_ENABLED,
Type.BOOLEAN,
KSQL_JSON_SR_CONVERTER_DESERIALIZER_ENABLED_DEFAULT,
Importance.LOW,
KSQL_JSON_SR_CONVERTER_DESERIALIZER_ENABLED_DOC
)
.withClientSslSupport();

for (final CompatibilityBreakingConfigDef compatibilityBreakingConfigDef
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,10 @@ public byte[] serialize(final String topic, final Object spec) {
}

final T schema;
final int id;
try {
final String subject = KsqlConstants.getSRSubject(topic, isKey);
final int id = srClient.getLatestSchemaMetadata(subject).getId();
id = srClient.getLatestSchemaMetadata(subject).getId();
schema = (T) srClient.getSchemaBySubjectAndId(subject, id);
} catch (Exception e) {
throw new KsqlException(e);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2022 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.test.serde.json;

import io.confluent.connect.json.JsonSchemaConverter;
import io.confluent.kafka.schemaregistry.json.JsonSchema;
import io.confluent.ksql.serde.json.JsonSchemaTranslator;
import io.confluent.ksql.test.serde.ConnectSerdeSupplier;
import org.apache.kafka.connect.data.Schema;

public class ValueSpecJsonSchemaSerdeSupplier extends ConnectSerdeSupplier<JsonSchema> {
private final JsonSchemaTranslator schemaTranslator;

public ValueSpecJsonSchemaSerdeSupplier() {
super(JsonSchemaConverter::new);
this.schemaTranslator = new JsonSchemaTranslator();
}

@Override
protected Schema fromParsedSchema(final JsonSchema schema) {
return schemaTranslator.toConnectSchema(schema);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableList;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.ksql.serde.json.JsonSerdeUtils;
import io.confluent.ksql.test.serde.SerdeSupplier;
import java.math.BigDecimal;
import java.math.BigInteger;
Expand All @@ -33,7 +32,6 @@
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;

Expand All @@ -45,14 +43,11 @@ public class ValueSpecJsonSerdeSupplier implements SerdeSupplier<Object> {

private static final ObjectMapper FLOAT_MAPPER = new ObjectMapper();

private final boolean useSchemas;
private final ObjectMapper mapper;

public ValueSpecJsonSerdeSupplier(
final boolean useSchemas,
final Map<String, Object> properties
) {
this.useSchemas = useSchemas;
mapper = (boolean) (properties.getOrDefault("use.exact.numeric.comparison", true))
? MAPPER : FLOAT_MAPPER;
}
Expand Down Expand Up @@ -90,13 +85,7 @@ public byte[] serialize(final String topicName, final Object spec) {
try {
final Object toSerialize = Converter.toJsonNode(spec);
final byte[] bytes = mapper.writeValueAsBytes(toSerialize);
if (!useSchemas) {
return bytes;
}

return ArrayUtils.addAll(
new byte[]{/*magic*/ 0x00, /*schemaID*/ 0x00, 0x00, 0x00, 0x01},
bytes);
return bytes;
} catch (final Exception e) {
throw new RuntimeException(e);
}
Expand All @@ -118,9 +107,7 @@ public Object deserialize(final String topicName, final byte[] data) {
return null;
}
try {
return useSchemas
? JsonSerdeUtils.readJsonSR(data, mapper, Object.class)
: mapper.readValue(data, Object.class);
return mapper.readValue(data, Object.class);
} catch (final Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,12 @@ public Serializer<Object> getKeySerializer(final Map<String, Object> properties)
@SuppressWarnings({"unchecked", "rawtypes"})
public Serializer<Object> getValueSerializer(final Map<String, Object> properties) {
final SerdeSupplier<?> valueSerdeSupplier = SerdeUtil
.getSerdeSupplier(valueFormat.getFormatInfo(), schema, properties);
.getSerdeSupplier(
valueFormat.getFormatInfo(),
schema,
properties,
valueFormat.getFeatures()
);

final Serializer<?> serializer = valueSerdeSupplier.getSerializer(srClient, false);

Expand Down Expand Up @@ -466,7 +471,12 @@ public Deserializer<?> getKeyDeserializer(final Map<String, Object> properties)

public Deserializer<?> getValueDeserializer(final Map<String, Object> properties) {
final SerdeSupplier<?> valueSerdeSupplier = SerdeUtil
.getSerdeSupplier(valueFormat.getFormatInfo(), schema, properties);
.getSerdeSupplier(
valueFormat.getFormatInfo(),
schema,
properties,
valueFormat.getFeatures()
);

final Deserializer<?> deserializer = valueSerdeSupplier.getDeserializer(srClient, false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.confluent.ksql.serde.FormatFactory;
import io.confluent.ksql.serde.FormatInfo;
import io.confluent.ksql.serde.KeyFormat;
import io.confluent.ksql.serde.SerdeFeatures;
import io.confluent.ksql.serde.avro.AvroFormat;
import io.confluent.ksql.serde.delimited.DelimitedFormat;
import io.confluent.ksql.serde.json.JsonFormat;
Expand All @@ -42,6 +43,7 @@
import io.confluent.ksql.serde.protobuf.ProtobufProperties;
import io.confluent.ksql.test.serde.SerdeSupplier;
import io.confluent.ksql.test.serde.avro.ValueSpecAvroSerdeSupplier;
import io.confluent.ksql.test.serde.json.ValueSpecJsonSchemaSerdeSupplier;
import io.confluent.ksql.test.serde.json.ValueSpecJsonSerdeSupplier;
import io.confluent.ksql.test.serde.kafka.KafkaSerdeSupplier;
import io.confluent.ksql.test.serde.none.NoneSerdeSupplier;
Expand Down Expand Up @@ -71,7 +73,8 @@ private SerdeUtil() {
public static SerdeSupplier<?> getSerdeSupplier(
final FormatInfo formatInfo,
final LogicalSchema schema,
final Map<String, Object> properties
final Map<String, Object> properties,
final SerdeFeatures serdeFeatures
) {
final Format format = FormatFactory.of(formatInfo);
switch (format.name()) {
Expand All @@ -81,8 +84,8 @@ public static SerdeSupplier<?> getSerdeSupplier(
new ProtobufProperties(formatInfo.getProperties()));
case ProtobufNoSRFormat.NAME:
return new ValueSpecProtobufNoSRSerdeSupplier(schema, formatInfo.getProperties());
case JsonFormat.NAME: return new ValueSpecJsonSerdeSupplier(false, properties);
case JsonSchemaFormat.NAME: return new ValueSpecJsonSerdeSupplier(true, properties);
case JsonFormat.NAME: return new ValueSpecJsonSerdeSupplier(properties);
case JsonSchemaFormat.NAME: return new ValueSpecJsonSchemaSerdeSupplier();
case DelimitedFormat.NAME: return new StringSerdeSupplier();
case KafkaFormat.NAME: return new KafkaSerdeSupplier(schema);
case NoneFormat.NAME: return new NoneSerdeSupplier();
Expand Down Expand Up @@ -128,7 +131,9 @@ public static <T> SerdeSupplier<?> getKeySerdeSupplier(
final SerdeSupplier<T> inner = (SerdeSupplier<T>) getSerdeSupplier(
keyFormat.getFormatInfo(),
schema,
properties);
properties,
keyFormat.getFeatures()
);

if (!keyFormat.getWindowType().isPresent()) {
return inner;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package io.confluent.ksql.test.serde.json;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableMap;
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.schemaregistry.json.JsonSchema;
import java.io.IOException;
import java.math.BigDecimal;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
public class ValueSpecJsonSchemaSerdeSupplierTest {
@Mock
private SchemaRegistryClient srClient;

@Test
public void shouldSerializeAndDeserializeDecimalsWithOutStrippingTrailingZeros() throws RestClientException, IOException {
// Given:
final ValueSpecJsonSchemaSerdeSupplier srSerde = new ValueSpecJsonSchemaSerdeSupplier();

final Serializer<Object> serializer = srSerde.getSerializer(srClient, false);
final Deserializer<Object> deserializer = srSerde.getDeserializer(srClient, false);

when(srClient.getLatestSchemaMetadata("t-value"))
.thenReturn(new SchemaMetadata(0, 1, ""));
when(srClient.getSchemaBySubjectAndId("t-value", 0))
.thenReturn(new JsonSchema("{\n" +
" \"properties\": {\n" +
" \"B\": {\n" +
" \"connect.index\": 0,\n" +
" \"oneOf\": [\n" +
" {\n" +
" \"type\": \"null\"\n" +
" },\n" +
" {\n" +
" \"connect.parameters\": {\n" +
" \"connect.decimal.precision\": \"3\",\n" +
" \"scale\": \"1\"\n" +
" },\n" +
" \"connect.type\": \"bytes\",\n" +
" \"connect.version\": 1,\n" +
" \"title\": \"org.apache.kafka.connect.data.Decimal\",\n" +
" \"type\": \"number\"\n" +
" }\n" +
" ]\n" +
" }\n" +
" },\n" +
" \"type\": \"object\"\n" +
"}"));

// When:
final byte[] bytes = serializer.serialize("t",
ImmutableMap.of("B", new BigDecimal("10.0")));

// Then:
assertThat(deserializer.deserialize("t", bytes),
is(ImmutableMap.of("B", new BigDecimal("10.0"))));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@

import com.google.common.collect.ImmutableMap;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.ksql.serde.json.JsonSerdeUtils;
import io.confluent.ksql.test.tools.TestJsonMapper;
import java.math.BigDecimal;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
Expand All @@ -39,12 +37,10 @@ public class ValueSpecJsonSerdeSupplierTest {
private SchemaRegistryClient srClient;

private ValueSpecJsonSerdeSupplier plainSerde;
private ValueSpecJsonSerdeSupplier srSerde;

@Before
public void setUp() {
plainSerde = new ValueSpecJsonSerdeSupplier(false, ImmutableMap.of());
srSerde = new ValueSpecJsonSerdeSupplier(true, ImmutableMap.of());
plainSerde = new ValueSpecJsonSerdeSupplier(ImmutableMap.of());
}

@Test
Expand Down Expand Up @@ -72,32 +68,4 @@ public void shouldDeserializeDecimalsWithoutStrippingTrailingZeros_Plain() {
// Then:
assertThat(result, is(new BigDecimal("10.0")));
}

@Test
public void shouldSerializeDecimalsWithOutStrippingTrailingZeros_Sr() throws Exception {
// Given:
final Serializer<Object> serializer = srSerde.getSerializer(srClient, false);

// When:
final byte[] bytes = serializer.serialize("t", new BigDecimal("10.0"));

// Then:
assertThat(JsonSerdeUtils.readJsonSR(bytes, TestJsonMapper.INSTANCE.get(), String.class), is("10.0"));
}

@Test
public void shouldDeserializeDecimalsWithoutStrippingTrailingZeros_Sr() {
// Given:
final Deserializer<Object> deserializer = srSerde.getDeserializer(srClient, false);

final byte[] jsonBytes = "10.0".getBytes(UTF_8);
final byte[] bytes = new byte[jsonBytes.length + JsonSerdeUtils.SIZE_OF_SR_PREFIX];
System.arraycopy(jsonBytes, 0, bytes, JsonSerdeUtils.SIZE_OF_SR_PREFIX, jsonBytes.length);

// When:
final Object result = deserializer.deserialize("t", bytes);

// Then:
assertThat(result, is(new BigDecimal("10.0")));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,7 @@ private static <K> ProducerRecord<byte[], byte[]> producerRecord(
final List<Header> headers
) {
final byte[] serializedKey = keySerializer.serialize("", key);
final byte[] serializeValue = new ValueSpecJsonSerdeSupplier(false, ImmutableMap.of())
final byte[] serializeValue = new ValueSpecJsonSerdeSupplier(ImmutableMap.of())
.getSerializer(null, false)
.serialize("", value);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@
"id" : 42
},
"value" : {
"c1" : 4
"c1" : 4,
"c2" : null
}
} ],
"topics" : [ {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@
"type" : "object",
"properties" : {
"c1" : {
"type" : "integer"
"type" : "integer",
"connect.index": 0,
"connect.type": "int64"
}
}
},
Expand Down Expand Up @@ -95,7 +97,9 @@
"type" : "object",
"properties" : {
"c1" : {
"type" : "integer"
"type" : "integer",
"connect.index": 0,
"connect.type": "int64"
}
}
}
Expand Down
Loading

0 comments on commit 798c12d

Please sign in to comment.