From 6f6b24146fe6da8e34095681933166f8967a25ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20Pe=C3=B1a?= Date: Wed, 13 Jul 2022 12:02:49 -0500 Subject: [PATCH] Address Hao's comments - return KsqlConnectDeserializer instead of KsqlJsonDeserializer - remove GENERALIZED_SUM_TYPE_SUPPORT_CONFIG for this PR (will do a follow-up PR) - move test from VAlueSpecJsonSerdeSupplierTest to ValueSpecJsonSchemaSerdeSupplierTest --- .../ValueSpecJsonSchemaSerdeSupplierTest.java | 78 +++++++++++++++++++ .../ksql/serde/json/KsqlJsonDeserializer.java | 56 ++----------- .../ksql/serde/json/KsqlJsonSerdeFactory.java | 49 +++++++++--- .../serde/json/KsqlJsonDeserializerTest.java | 74 +----------------- 4 files changed, 125 insertions(+), 132 deletions(-) create mode 100644 ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/serde/json/ValueSpecJsonSchemaSerdeSupplierTest.java diff --git a/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/serde/json/ValueSpecJsonSchemaSerdeSupplierTest.java b/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/serde/json/ValueSpecJsonSchemaSerdeSupplierTest.java new file mode 100644 index 000000000000..eca6576a4aa3 --- /dev/null +++ b/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/serde/json/ValueSpecJsonSchemaSerdeSupplierTest.java @@ -0,0 +1,78 @@ +package io.confluent.ksql.test.serde.json; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableMap; +import io.confluent.kafka.schemaregistry.client.SchemaMetadata; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; +import io.confluent.kafka.schemaregistry.json.JsonSchema; +import io.confluent.ksql.name.ColumnName; +import io.confluent.ksql.schema.ksql.LogicalSchema; +import io.confluent.ksql.schema.ksql.types.SqlDecimal; +import io.confluent.ksql.serde.SerdeFeatures; +import java.io.IOException; +import java.math.BigDecimal; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class ValueSpecJsonSchemaSerdeSupplierTest { + @Mock + private SchemaRegistryClient srClient; + + @Test + public void shouldSerializeAndDeserializeDecimalsWithOutStrippingTrailingZeros() throws RestClientException, IOException { + // Given: + final ValueSpecJsonSchemaSerdeSupplier srSerde = new ValueSpecJsonSchemaSerdeSupplier( + LogicalSchema.builder() + .valueColumn(ColumnName.of("B"), SqlDecimal.of(3, 1)) + .build(), + SerdeFeatures.of() + ); + + final Serializer serializer = srSerde.getSerializer(srClient, false); + final Deserializer deserializer = srSerde.getDeserializer(srClient, false); + + when(srClient.getLatestSchemaMetadata("t-value")) + .thenReturn(new SchemaMetadata(0, 1, "")); + when(srClient.getSchemaBySubjectAndId("t-value", 0)) + .thenReturn(new JsonSchema("{\n" + + " \"properties\": {\n" + + " \"B\": {\n" + + " \"connect.index\": 0,\n" + + " \"oneOf\": [\n" + + " {\n" + + " \"type\": \"null\"\n" + + " },\n" + + " {\n" + + " \"connect.parameters\": {\n" + + " \"connect.decimal.precision\": \"3\",\n" + + " \"scale\": \"1\"\n" + + " },\n" + + " \"connect.type\": \"bytes\",\n" + + " \"connect.version\": 1,\n" + + " \"title\": \"org.apache.kafka.connect.data.Decimal\",\n" + + " \"type\": \"number\"\n" + + " }\n" + + " ]\n" + + " }\n" + + " },\n" + + " \"type\": \"object\"\n" + + "}")); + + // When: + final byte[] bytes = serializer.serialize("t", + ImmutableMap.of("B", new BigDecimal("10.0"))); + + // Then: + assertThat(deserializer.deserialize("t", bytes), + is(ImmutableMap.of("B", new BigDecimal("10.0")))); + } +} diff --git a/ksqldb-serde/src/main/java/io/confluent/ksql/serde/json/KsqlJsonDeserializer.java b/ksqldb-serde/src/main/java/io/confluent/ksql/serde/json/KsqlJsonDeserializer.java index b94451bb2218..b367902187e8 100644 --- a/ksqldb-serde/src/main/java/io/confluent/ksql/serde/json/KsqlJsonDeserializer.java +++ b/ksqldb-serde/src/main/java/io/confluent/ksql/serde/json/KsqlJsonDeserializer.java @@ -30,11 +30,8 @@ import com.fasterxml.jackson.databind.node.TextNode; import com.fasterxml.jackson.databind.ser.std.DateSerializer; import com.google.common.collect.ImmutableMap; -import io.confluent.ksql.schema.connect.SchemaWalker; -import io.confluent.ksql.schema.connect.SchemaWalker.Visitor; import io.confluent.ksql.schema.connect.SqlSchemaFormatter; import io.confluent.ksql.serde.SerdeUtils; -import io.confluent.ksql.serde.connect.DataTranslator; import io.confluent.ksql.util.DecimalUtil; import io.confluent.ksql.util.KsqlException; import java.io.IOException; @@ -55,12 +52,10 @@ import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Schema.Type; -import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.data.Time; import org.apache.kafka.connect.data.Timestamp; -import org.apache.kafka.connect.storage.Converter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -92,24 +87,15 @@ public class KsqlJsonDeserializer implements Deserializer { .build(); private final Schema schema; - private final boolean isJsonSchema; private final Class targetType; - private final Converter converter; - private final DataTranslator translator; private String target = "?"; KsqlJsonDeserializer( final Schema schema, - final boolean isJsonSchema, - final Class targetType, - final Converter converter, - final DataTranslator translator + final Class targetType ) { - this.schema = validateSchema(Objects.requireNonNull(schema, "schema")); - this.isJsonSchema = isJsonSchema; + this.schema = Objects.requireNonNull(schema, "schema"); this.targetType = Objects.requireNonNull(targetType, "targetType"); - this.converter = Objects.requireNonNull(converter, "converter"); - this.translator = Objects.requireNonNull(translator, "translator"); SerdeUtils.throwOnSchemaJavaTypeMismatch(schema, targetType); } @@ -126,18 +112,11 @@ public T deserialize(final String topic, final byte[] bytes) { return null; } - final Object coerced; - if (isJsonSchema) { - final SchemaAndValue schemaAndValue = converter.toConnectData(topic, bytes); - coerced = translator.toKsqlRow(schemaAndValue.schema(), schemaAndValue.value()); - } else { - final JsonNode jsonNode = MAPPER.readTree(bytes); - coerced = enforceFieldType( - "$", - new JsonValueContext(jsonNode, schema) - ); - } - + final JsonNode jsonNode = MAPPER.readTree(bytes); + final Object coerced = enforceFieldType( + "$", + new JsonValueContext(jsonNode, schema) + ); if (LOG.isTraceEnabled()) { LOG.trace("Deserialized {}. topic:{}, row:{}", target, topic, coerced); @@ -367,25 +346,4 @@ public String getPath() { return path; } } - - private static Schema validateSchema(final Schema schema) { - - class SchemaValidator implements Visitor { - - @Override - public Void visitMap(final Schema mapSchema, final Void key, final Void value) { - if (mapSchema.keySchema().type() != Type.STRING) { - throw new KsqlException("JSON only supports MAP types with STRING keys"); - } - return null; - } - - public Void visitSchema(final Schema schema11) { - return null; - } - } - - SchemaWalker.visit(schema, new SchemaValidator()); - return schema; - } } \ No newline at end of file diff --git a/ksqldb-serde/src/main/java/io/confluent/ksql/serde/json/KsqlJsonSerdeFactory.java b/ksqldb-serde/src/main/java/io/confluent/ksql/serde/json/KsqlJsonSerdeFactory.java index 3cb5df4205d3..b9ddbfb7a0dd 100644 --- a/ksqldb-serde/src/main/java/io/confluent/ksql/serde/json/KsqlJsonSerdeFactory.java +++ b/ksqldb-serde/src/main/java/io/confluent/ksql/serde/json/KsqlJsonSerdeFactory.java @@ -19,16 +19,18 @@ import com.google.errorprone.annotations.Immutable; import io.confluent.connect.json.JsonSchemaConverter; import io.confluent.connect.json.JsonSchemaConverterConfig; -import io.confluent.connect.json.JsonSchemaDataConfig; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; +import io.confluent.ksql.schema.connect.SchemaWalker; import io.confluent.ksql.serde.SerdeFactory; import io.confluent.ksql.serde.SerdeUtils; import io.confluent.ksql.serde.connect.ConnectDataTranslator; import io.confluent.ksql.serde.connect.ConnectSRSchemaDataTranslator; +import io.confluent.ksql.serde.connect.KsqlConnectDeserializer; import io.confluent.ksql.serde.connect.KsqlConnectSerializer; import io.confluent.ksql.serde.tls.ThreadLocalSerializer; import io.confluent.ksql.util.KsqlConfig; +import io.confluent.ksql.util.KsqlException; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -71,6 +73,8 @@ public Serde createSerde( final Class targetType, final boolean isKey ) { + validateSchema(schema); + final Optional physicalSchema; if (useSchemaRegistryFormat) { physicalSchema = properties.getSchemaId().isPresent() ? Optional.of( @@ -130,13 +134,18 @@ private Deserializer createDeserializer( final ConnectDataTranslator dataTranslator, final Converter converter ) { - return new KsqlJsonDeserializer<>( - schema, - useSchemaRegistryFormat, - targetType, - converter, - dataTranslator - ); + if (useSchemaRegistryFormat) { + return new KsqlConnectDeserializer<>( + converter, + dataTranslator, + targetType + ); + } else { + return new KsqlJsonDeserializer<>( + schema, + targetType + ); + } } private static Converter getConverter() { @@ -168,12 +177,30 @@ private static Converter getSchemaRegistryConverter( } config.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, DecimalFormat.NUMERIC.name()); - // Makes naming of unions consistent between all SR formats (i.e. connect_union_field_0) - config.put(JsonSchemaDataConfig.GENERALIZED_SUM_TYPE_SUPPORT_CONFIG, true); - final Converter converter = new JsonSchemaConverter(schemaRegistryClient); converter.configure(config, isKey); return converter; } + + private static Schema validateSchema(final Schema schema) { + + class SchemaValidator implements SchemaWalker.Visitor { + + @Override + public Void visitMap(final Schema mapSchema, final Void key, final Void value) { + if (mapSchema.keySchema().type() != Schema.Type.STRING) { + throw new KsqlException("JSON only supports MAP types with STRING keys"); + } + return null; + } + + public Void visitSchema(final Schema schema11) { + return null; + } + } + + SchemaWalker.visit(schema, new SchemaValidator()); + return schema; + } } diff --git a/ksqldb-serde/src/test/java/io/confluent/ksql/serde/json/KsqlJsonDeserializerTest.java b/ksqldb-serde/src/test/java/io/confluent/ksql/serde/json/KsqlJsonDeserializerTest.java index 851ee8021dfa..039553e49349 100644 --- a/ksqldb-serde/src/test/java/io/confluent/ksql/serde/json/KsqlJsonDeserializerTest.java +++ b/ksqldb-serde/src/test/java/io/confluent/ksql/serde/json/KsqlJsonDeserializerTest.java @@ -41,7 +41,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import io.confluent.ksql.serde.SerdeUtils; -import io.confluent.ksql.serde.connect.ConnectDataTranslator; import io.confluent.ksql.serde.connect.ConnectKsqlSchemaTranslator; import io.confluent.ksql.util.DecimalUtil; import io.confluent.ksql.util.KsqlException; @@ -63,8 +62,6 @@ import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.data.Time; import org.apache.kafka.connect.data.Timestamp; -import org.apache.kafka.connect.json.JsonConverter; -import org.apache.kafka.connect.storage.Converter; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -132,14 +129,9 @@ public class KsqlJsonDeserializerTest { private Struct expectedOrder; private KsqlJsonDeserializer deserializer; - private Converter converter; - private ConnectDataTranslator dataTranslator; @Before public void before() throws Exception { - converter = new JsonConverter(); - dataTranslator = new ConnectDataTranslator(ORDER_SCHEMA); - expectedOrder = new Struct(ORDER_SCHEMA) .put(ORDERTIME, 1511897796092L) .put(ORDERID, 1L) @@ -904,65 +896,6 @@ public void shouldThrowIfCanNotCoerceMapValue() { "Can't convert type. sourceType: BooleanNode, requiredType: INTEGER")))); } - @Test - public void shouldThrowOnMapSchemaWithNonStringKeys() { - // Given: - final ConnectSchema schema = (ConnectSchema) SchemaBuilder - .struct() - .field("f0", SchemaBuilder - .map(Schema.OPTIONAL_INT32_SCHEMA, Schema.INT32_SCHEMA) - .optional() - .build()) - .build(); - - // When: - final Exception e = assertThrows( - KsqlException.class, - () -> new KsqlJsonDeserializer<>( - schema, - false, - Struct.class, - converter, - dataTranslator - ) - ); - - // Then: - assertThat(e.getMessage(), containsString( - "JSON only supports MAP types with STRING keys")); - } - - @Test - public void shouldThrowOnNestedMapSchemaWithNonStringKeys() { - // Given: - final ConnectSchema schema = (ConnectSchema) SchemaBuilder - .struct() - .field("f0", SchemaBuilder - .struct() - .field("f1", SchemaBuilder - .map(Schema.OPTIONAL_INT32_SCHEMA, Schema.INT32_SCHEMA) - .optional() - .build()) - .build()) - .build(); - - // When: - final Exception e = assertThrows( - KsqlException.class, - () -> new KsqlJsonDeserializer<>( - schema, - false, - Struct.class, - converter, - dataTranslator - ) - ); - - // Then: - assertThat(e.getMessage(), containsString( - "JSON only supports MAP types with STRING keys")); - } - @Test public void shouldIncludeTopicNameInException() { // Given: @@ -1109,11 +1042,8 @@ private KsqlJsonDeserializer givenDeserializerForSchema( final Class type ) { return new KsqlJsonDeserializer<>( - (ConnectSchema) schema, - false, - type, - converter, - dataTranslator + schema, + type ); }