Skip to content

Commit

Permalink
Address Hao's comments
Browse files Browse the repository at this point in the history
- return KsqlConnectDeserializer instead of KsqlJsonDeserializer
- remove GENERALIZED_SUM_TYPE_SUPPORT_CONFIG for this PR (will do a follow-up PR)
- move test from VAlueSpecJsonSerdeSupplierTest to ValueSpecJsonSchemaSerdeSupplierTest
  • Loading branch information
spena committed Jul 13, 2022
1 parent 53f56b1 commit 6f6b241
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 132 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package io.confluent.ksql.test.serde.json;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableMap;
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.schemaregistry.json.JsonSchema;
import io.confluent.ksql.name.ColumnName;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.types.SqlDecimal;
import io.confluent.ksql.serde.SerdeFeatures;
import java.io.IOException;
import java.math.BigDecimal;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
public class ValueSpecJsonSchemaSerdeSupplierTest {
@Mock
private SchemaRegistryClient srClient;

@Test
public void shouldSerializeAndDeserializeDecimalsWithOutStrippingTrailingZeros() throws RestClientException, IOException {
// Given:
final ValueSpecJsonSchemaSerdeSupplier srSerde = new ValueSpecJsonSchemaSerdeSupplier(
LogicalSchema.builder()
.valueColumn(ColumnName.of("B"), SqlDecimal.of(3, 1))
.build(),
SerdeFeatures.of()
);

final Serializer<Object> serializer = srSerde.getSerializer(srClient, false);
final Deserializer<Object> deserializer = srSerde.getDeserializer(srClient, false);

when(srClient.getLatestSchemaMetadata("t-value"))
.thenReturn(new SchemaMetadata(0, 1, ""));
when(srClient.getSchemaBySubjectAndId("t-value", 0))
.thenReturn(new JsonSchema("{\n" +
" \"properties\": {\n" +
" \"B\": {\n" +
" \"connect.index\": 0,\n" +
" \"oneOf\": [\n" +
" {\n" +
" \"type\": \"null\"\n" +
" },\n" +
" {\n" +
" \"connect.parameters\": {\n" +
" \"connect.decimal.precision\": \"3\",\n" +
" \"scale\": \"1\"\n" +
" },\n" +
" \"connect.type\": \"bytes\",\n" +
" \"connect.version\": 1,\n" +
" \"title\": \"org.apache.kafka.connect.data.Decimal\",\n" +
" \"type\": \"number\"\n" +
" }\n" +
" ]\n" +
" }\n" +
" },\n" +
" \"type\": \"object\"\n" +
"}"));

// When:
final byte[] bytes = serializer.serialize("t",
ImmutableMap.of("B", new BigDecimal("10.0")));

// Then:
assertThat(deserializer.deserialize("t", bytes),
is(ImmutableMap.of("B", new BigDecimal("10.0"))));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,8 @@
import com.fasterxml.jackson.databind.node.TextNode;
import com.fasterxml.jackson.databind.ser.std.DateSerializer;
import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.schema.connect.SchemaWalker;
import io.confluent.ksql.schema.connect.SchemaWalker.Visitor;
import io.confluent.ksql.schema.connect.SqlSchemaFormatter;
import io.confluent.ksql.serde.SerdeUtils;
import io.confluent.ksql.serde.connect.DataTranslator;
import io.confluent.ksql.util.DecimalUtil;
import io.confluent.ksql.util.KsqlException;
import java.io.IOException;
Expand All @@ -55,12 +52,10 @@
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Schema.Type;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.data.Time;
import org.apache.kafka.connect.data.Timestamp;
import org.apache.kafka.connect.storage.Converter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -92,24 +87,15 @@ public class KsqlJsonDeserializer<T> implements Deserializer<T> {
.build();

private final Schema schema;
private final boolean isJsonSchema;
private final Class<T> targetType;
private final Converter converter;
private final DataTranslator translator;
private String target = "?";

KsqlJsonDeserializer(
final Schema schema,
final boolean isJsonSchema,
final Class<T> targetType,
final Converter converter,
final DataTranslator translator
final Class<T> targetType
) {
this.schema = validateSchema(Objects.requireNonNull(schema, "schema"));
this.isJsonSchema = isJsonSchema;
this.schema = Objects.requireNonNull(schema, "schema");
this.targetType = Objects.requireNonNull(targetType, "targetType");
this.converter = Objects.requireNonNull(converter, "converter");
this.translator = Objects.requireNonNull(translator, "translator");

SerdeUtils.throwOnSchemaJavaTypeMismatch(schema, targetType);
}
Expand All @@ -126,18 +112,11 @@ public T deserialize(final String topic, final byte[] bytes) {
return null;
}

final Object coerced;
if (isJsonSchema) {
final SchemaAndValue schemaAndValue = converter.toConnectData(topic, bytes);
coerced = translator.toKsqlRow(schemaAndValue.schema(), schemaAndValue.value());
} else {
final JsonNode jsonNode = MAPPER.readTree(bytes);
coerced = enforceFieldType(
"$",
new JsonValueContext(jsonNode, schema)
);
}

final JsonNode jsonNode = MAPPER.readTree(bytes);
final Object coerced = enforceFieldType(
"$",
new JsonValueContext(jsonNode, schema)
);

if (LOG.isTraceEnabled()) {
LOG.trace("Deserialized {}. topic:{}, row:{}", target, topic, coerced);
Expand Down Expand Up @@ -367,25 +346,4 @@ public String getPath() {
return path;
}
}

private static Schema validateSchema(final Schema schema) {

class SchemaValidator implements Visitor<Void, Void> {

@Override
public Void visitMap(final Schema mapSchema, final Void key, final Void value) {
if (mapSchema.keySchema().type() != Type.STRING) {
throw new KsqlException("JSON only supports MAP types with STRING keys");
}
return null;
}

public Void visitSchema(final Schema schema11) {
return null;
}
}

SchemaWalker.visit(schema, new SchemaValidator());
return schema;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,18 @@
import com.google.errorprone.annotations.Immutable;
import io.confluent.connect.json.JsonSchemaConverter;
import io.confluent.connect.json.JsonSchemaConverterConfig;
import io.confluent.connect.json.JsonSchemaDataConfig;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import io.confluent.ksql.schema.connect.SchemaWalker;
import io.confluent.ksql.serde.SerdeFactory;
import io.confluent.ksql.serde.SerdeUtils;
import io.confluent.ksql.serde.connect.ConnectDataTranslator;
import io.confluent.ksql.serde.connect.ConnectSRSchemaDataTranslator;
import io.confluent.ksql.serde.connect.KsqlConnectDeserializer;
import io.confluent.ksql.serde.connect.KsqlConnectSerializer;
import io.confluent.ksql.serde.tls.ThreadLocalSerializer;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -71,6 +73,8 @@ public <T> Serde<T> createSerde(
final Class<T> targetType,
final boolean isKey
) {
validateSchema(schema);

final Optional<Schema> physicalSchema;
if (useSchemaRegistryFormat) {
physicalSchema = properties.getSchemaId().isPresent() ? Optional.of(
Expand Down Expand Up @@ -130,13 +134,18 @@ private <T> Deserializer<T> createDeserializer(
final ConnectDataTranslator dataTranslator,
final Converter converter
) {
return new KsqlJsonDeserializer<>(
schema,
useSchemaRegistryFormat,
targetType,
converter,
dataTranslator
);
if (useSchemaRegistryFormat) {
return new KsqlConnectDeserializer<>(
converter,
dataTranslator,
targetType
);
} else {
return new KsqlJsonDeserializer<>(
schema,
targetType
);
}
}

private static Converter getConverter() {
Expand Down Expand Up @@ -168,12 +177,30 @@ private static Converter getSchemaRegistryConverter(
}
config.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, DecimalFormat.NUMERIC.name());

// Makes naming of unions consistent between all SR formats (i.e. connect_union_field_0)
config.put(JsonSchemaDataConfig.GENERALIZED_SUM_TYPE_SUPPORT_CONFIG, true);

final Converter converter = new JsonSchemaConverter(schemaRegistryClient);
converter.configure(config, isKey);

return converter;
}

private static Schema validateSchema(final Schema schema) {

class SchemaValidator implements SchemaWalker.Visitor<Void, Void> {

@Override
public Void visitMap(final Schema mapSchema, final Void key, final Void value) {
if (mapSchema.keySchema().type() != Schema.Type.STRING) {
throw new KsqlException("JSON only supports MAP types with STRING keys");
}
return null;
}

public Void visitSchema(final Schema schema11) {
return null;
}
}

SchemaWalker.visit(schema, new SchemaValidator());
return schema;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.serde.SerdeUtils;
import io.confluent.ksql.serde.connect.ConnectDataTranslator;
import io.confluent.ksql.serde.connect.ConnectKsqlSchemaTranslator;
import io.confluent.ksql.util.DecimalUtil;
import io.confluent.ksql.util.KsqlException;
Expand All @@ -63,8 +62,6 @@
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.data.Time;
import org.apache.kafka.connect.data.Timestamp;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.storage.Converter;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -132,14 +129,9 @@ public class KsqlJsonDeserializerTest {

private Struct expectedOrder;
private KsqlJsonDeserializer<Struct> deserializer;
private Converter converter;
private ConnectDataTranslator dataTranslator;

@Before
public void before() throws Exception {
converter = new JsonConverter();
dataTranslator = new ConnectDataTranslator(ORDER_SCHEMA);

expectedOrder = new Struct(ORDER_SCHEMA)
.put(ORDERTIME, 1511897796092L)
.put(ORDERID, 1L)
Expand Down Expand Up @@ -904,65 +896,6 @@ public void shouldThrowIfCanNotCoerceMapValue() {
"Can't convert type. sourceType: BooleanNode, requiredType: INTEGER"))));
}

@Test
public void shouldThrowOnMapSchemaWithNonStringKeys() {
// Given:
final ConnectSchema schema = (ConnectSchema) SchemaBuilder
.struct()
.field("f0", SchemaBuilder
.map(Schema.OPTIONAL_INT32_SCHEMA, Schema.INT32_SCHEMA)
.optional()
.build())
.build();

// When:
final Exception e = assertThrows(
KsqlException.class,
() -> new KsqlJsonDeserializer<>(
schema,
false,
Struct.class,
converter,
dataTranslator
)
);

// Then:
assertThat(e.getMessage(), containsString(
"JSON only supports MAP types with STRING keys"));
}

@Test
public void shouldThrowOnNestedMapSchemaWithNonStringKeys() {
// Given:
final ConnectSchema schema = (ConnectSchema) SchemaBuilder
.struct()
.field("f0", SchemaBuilder
.struct()
.field("f1", SchemaBuilder
.map(Schema.OPTIONAL_INT32_SCHEMA, Schema.INT32_SCHEMA)
.optional()
.build())
.build())
.build();

// When:
final Exception e = assertThrows(
KsqlException.class,
() -> new KsqlJsonDeserializer<>(
schema,
false,
Struct.class,
converter,
dataTranslator
)
);

// Then:
assertThat(e.getMessage(), containsString(
"JSON only supports MAP types with STRING keys"));
}

@Test
public void shouldIncludeTopicNameInException() {
// Given:
Expand Down Expand Up @@ -1109,11 +1042,8 @@ private <T> KsqlJsonDeserializer<T> givenDeserializerForSchema(
final Class<T> type
) {
return new KsqlJsonDeserializer<>(
(ConnectSchema) schema,
false,
type,
converter,
dataTranslator
schema,
type
);
}

Expand Down

0 comments on commit 6f6b241

Please sign in to comment.