diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java b/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java index d4e6f14f22b7..b0576320544d 100644 --- a/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java +++ b/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java @@ -548,6 +548,18 @@ public class KsqlConfig extends AbstractConfig { + "that contains headers columns will work with the headers functionality to prevent " + "a degraded command topic situation when restarting ksqlDB."; + public static final String KSQL_JSON_SR_CONVERTER_DESERIALIZER_ENABLED + = "ksql.json_sr.converter.deserializer.enabled"; + + private static final Boolean KSQL_JSON_SR_CONVERTER_DESERIALIZER_ENABLED_DEFAULT = true; + + private static final String KSQL_JSON_SR_CONVERTER_DESERIALIZER_ENABLED_DOC = "" + + "Feature flag that enables the use of the JsonSchemaConverter class for deserializing " + + "JSON_SR records. JsonSchemaConverter is required to support `anyOf` JSON_SR types. " + + "This flag should be used to disable this feature only when users experience " + + "deserialization issues caused by the JsonSchemaConverter. Otherwise, this flag should " + + "remain true to take advantage of the new `anyOf` types and other JSON_SR serde fixes."; + public static final String KSQL_SOURCE_TABLE_MATERIALIZATION_ENABLED = "ksql.source.table.materialization.enabled"; private static final Boolean KSQL_SOURCE_TABLE_MATERIALIZATION_ENABLED_DEFAULT = true; @@ -1470,6 +1482,13 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) { Importance.LOW, KSQL_WEBSOCKET_CONNECTION_MAX_TIMEOUT_MS_DOC ) + .define( + KSQL_JSON_SR_CONVERTER_DESERIALIZER_ENABLED, + Type.BOOLEAN, + KSQL_JSON_SR_CONVERTER_DESERIALIZER_ENABLED_DEFAULT, + Importance.LOW, + KSQL_JSON_SR_CONVERTER_DESERIALIZER_ENABLED_DOC + ) .withClientSslSupport(); for (final CompatibilityBreakingConfigDef compatibilityBreakingConfigDef diff --git a/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/serde/ConnectSerdeSupplier.java b/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/serde/ConnectSerdeSupplier.java index b18fb37ce77d..16d4de0b5609 100644 --- a/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/serde/ConnectSerdeSupplier.java +++ b/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/serde/ConnectSerdeSupplier.java @@ -87,9 +87,10 @@ public byte[] serialize(final String topic, final Object spec) { } final T schema; + final int id; try { final String subject = KsqlConstants.getSRSubject(topic, isKey); - final int id = srClient.getLatestSchemaMetadata(subject).getId(); + id = srClient.getLatestSchemaMetadata(subject).getId(); schema = (T) srClient.getSchemaBySubjectAndId(subject, id); } catch (Exception e) { throw new KsqlException(e); diff --git a/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/serde/json/ValueSpecJsonSchemaSerdeSupplier.java b/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/serde/json/ValueSpecJsonSchemaSerdeSupplier.java new file mode 100644 index 000000000000..9fa3412fb00e --- /dev/null +++ b/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/serde/json/ValueSpecJsonSchemaSerdeSupplier.java @@ -0,0 +1,36 @@ +/* + * Copyright 2022 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.test.serde.json; + +import io.confluent.connect.json.JsonSchemaConverter; +import io.confluent.kafka.schemaregistry.json.JsonSchema; +import io.confluent.ksql.serde.json.JsonSchemaTranslator; +import io.confluent.ksql.test.serde.ConnectSerdeSupplier; +import org.apache.kafka.connect.data.Schema; + +public class ValueSpecJsonSchemaSerdeSupplier extends ConnectSerdeSupplier { + private final JsonSchemaTranslator schemaTranslator; + + public ValueSpecJsonSchemaSerdeSupplier() { + super(JsonSchemaConverter::new); + this.schemaTranslator = new JsonSchemaTranslator(); + } + + @Override + protected Schema fromParsedSchema(final JsonSchema schema) { + return schemaTranslator.toConnectSchema(schema); + } +} diff --git a/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/serde/json/ValueSpecJsonSerdeSupplier.java b/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/serde/json/ValueSpecJsonSerdeSupplier.java index bfa0fcacf0a0..fa8480cefa27 100644 --- a/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/serde/json/ValueSpecJsonSerdeSupplier.java +++ b/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/serde/json/ValueSpecJsonSerdeSupplier.java @@ -23,7 +23,6 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.ImmutableList; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; -import io.confluent.ksql.serde.json.JsonSerdeUtils; import io.confluent.ksql.test.serde.SerdeSupplier; import java.math.BigDecimal; import java.math.BigInteger; @@ -33,7 +32,6 @@ import java.util.Objects; import java.util.function.Function; import java.util.stream.Collectors; -import org.apache.commons.lang3.ArrayUtils; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serializer; @@ -45,14 +43,11 @@ public class ValueSpecJsonSerdeSupplier implements SerdeSupplier { private static final ObjectMapper FLOAT_MAPPER = new ObjectMapper(); - private final boolean useSchemas; private final ObjectMapper mapper; public ValueSpecJsonSerdeSupplier( - final boolean useSchemas, final Map properties ) { - this.useSchemas = useSchemas; mapper = (boolean) (properties.getOrDefault("use.exact.numeric.comparison", true)) ? MAPPER : FLOAT_MAPPER; } @@ -90,13 +85,7 @@ public byte[] serialize(final String topicName, final Object spec) { try { final Object toSerialize = Converter.toJsonNode(spec); final byte[] bytes = mapper.writeValueAsBytes(toSerialize); - if (!useSchemas) { - return bytes; - } - - return ArrayUtils.addAll( - new byte[]{/*magic*/ 0x00, /*schemaID*/ 0x00, 0x00, 0x00, 0x01}, - bytes); + return bytes; } catch (final Exception e) { throw new RuntimeException(e); } @@ -118,9 +107,7 @@ public Object deserialize(final String topicName, final byte[] data) { return null; } try { - return useSchemas - ? JsonSerdeUtils.readJsonSR(data, mapper, Object.class) - : mapper.readValue(data, Object.class); + return mapper.readValue(data, Object.class); } catch (final Exception e) { throw new RuntimeException(e); } diff --git a/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/tools/TopicInfoCache.java b/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/tools/TopicInfoCache.java index 8663576606a7..d5c777bbf20a 100644 --- a/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/tools/TopicInfoCache.java +++ b/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/tools/TopicInfoCache.java @@ -433,7 +433,12 @@ public Serializer getKeySerializer(final Map properties) @SuppressWarnings({"unchecked", "rawtypes"}) public Serializer getValueSerializer(final Map properties) { final SerdeSupplier valueSerdeSupplier = SerdeUtil - .getSerdeSupplier(valueFormat.getFormatInfo(), schema, properties); + .getSerdeSupplier( + valueFormat.getFormatInfo(), + schema, + properties, + valueFormat.getFeatures() + ); final Serializer serializer = valueSerdeSupplier.getSerializer(srClient, false); @@ -466,7 +471,12 @@ public Deserializer getKeyDeserializer(final Map properties) public Deserializer getValueDeserializer(final Map properties) { final SerdeSupplier valueSerdeSupplier = SerdeUtil - .getSerdeSupplier(valueFormat.getFormatInfo(), schema, properties); + .getSerdeSupplier( + valueFormat.getFormatInfo(), + schema, + properties, + valueFormat.getFeatures() + ); final Deserializer deserializer = valueSerdeSupplier.getDeserializer(srClient, false); diff --git a/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/utils/SerdeUtil.java b/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/utils/SerdeUtil.java index 86db18bd8b31..94ac79921b68 100644 --- a/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/utils/SerdeUtil.java +++ b/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/utils/SerdeUtil.java @@ -31,6 +31,7 @@ import io.confluent.ksql.serde.FormatFactory; import io.confluent.ksql.serde.FormatInfo; import io.confluent.ksql.serde.KeyFormat; +import io.confluent.ksql.serde.SerdeFeatures; import io.confluent.ksql.serde.avro.AvroFormat; import io.confluent.ksql.serde.delimited.DelimitedFormat; import io.confluent.ksql.serde.json.JsonFormat; @@ -42,6 +43,7 @@ import io.confluent.ksql.serde.protobuf.ProtobufProperties; import io.confluent.ksql.test.serde.SerdeSupplier; import io.confluent.ksql.test.serde.avro.ValueSpecAvroSerdeSupplier; +import io.confluent.ksql.test.serde.json.ValueSpecJsonSchemaSerdeSupplier; import io.confluent.ksql.test.serde.json.ValueSpecJsonSerdeSupplier; import io.confluent.ksql.test.serde.kafka.KafkaSerdeSupplier; import io.confluent.ksql.test.serde.none.NoneSerdeSupplier; @@ -71,7 +73,8 @@ private SerdeUtil() { public static SerdeSupplier getSerdeSupplier( final FormatInfo formatInfo, final LogicalSchema schema, - final Map properties + final Map properties, + final SerdeFeatures serdeFeatures ) { final Format format = FormatFactory.of(formatInfo); switch (format.name()) { @@ -81,8 +84,8 @@ public static SerdeSupplier getSerdeSupplier( new ProtobufProperties(formatInfo.getProperties())); case ProtobufNoSRFormat.NAME: return new ValueSpecProtobufNoSRSerdeSupplier(schema, formatInfo.getProperties()); - case JsonFormat.NAME: return new ValueSpecJsonSerdeSupplier(false, properties); - case JsonSchemaFormat.NAME: return new ValueSpecJsonSerdeSupplier(true, properties); + case JsonFormat.NAME: return new ValueSpecJsonSerdeSupplier(properties); + case JsonSchemaFormat.NAME: return new ValueSpecJsonSchemaSerdeSupplier(); case DelimitedFormat.NAME: return new StringSerdeSupplier(); case KafkaFormat.NAME: return new KafkaSerdeSupplier(schema); case NoneFormat.NAME: return new NoneSerdeSupplier(); @@ -128,7 +131,9 @@ public static SerdeSupplier getKeySerdeSupplier( final SerdeSupplier inner = (SerdeSupplier) getSerdeSupplier( keyFormat.getFormatInfo(), schema, - properties); + properties, + keyFormat.getFeatures() + ); if (!keyFormat.getWindowType().isPresent()) { return inner; diff --git a/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/serde/json/ValueSpecJsonSchemaSerdeSupplierTest.java b/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/serde/json/ValueSpecJsonSchemaSerdeSupplierTest.java new file mode 100644 index 000000000000..8d279ef99613 --- /dev/null +++ b/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/serde/json/ValueSpecJsonSchemaSerdeSupplierTest.java @@ -0,0 +1,69 @@ +package io.confluent.ksql.test.serde.json; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableMap; +import io.confluent.kafka.schemaregistry.client.SchemaMetadata; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; +import io.confluent.kafka.schemaregistry.json.JsonSchema; +import java.io.IOException; +import java.math.BigDecimal; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class ValueSpecJsonSchemaSerdeSupplierTest { + @Mock + private SchemaRegistryClient srClient; + + @Test + public void shouldSerializeAndDeserializeDecimalsWithOutStrippingTrailingZeros() throws RestClientException, IOException { + // Given: + final ValueSpecJsonSchemaSerdeSupplier srSerde = new ValueSpecJsonSchemaSerdeSupplier(); + + final Serializer serializer = srSerde.getSerializer(srClient, false); + final Deserializer deserializer = srSerde.getDeserializer(srClient, false); + + when(srClient.getLatestSchemaMetadata("t-value")) + .thenReturn(new SchemaMetadata(0, 1, "")); + when(srClient.getSchemaBySubjectAndId("t-value", 0)) + .thenReturn(new JsonSchema("{\n" + + " \"properties\": {\n" + + " \"B\": {\n" + + " \"connect.index\": 0,\n" + + " \"oneOf\": [\n" + + " {\n" + + " \"type\": \"null\"\n" + + " },\n" + + " {\n" + + " \"connect.parameters\": {\n" + + " \"connect.decimal.precision\": \"3\",\n" + + " \"scale\": \"1\"\n" + + " },\n" + + " \"connect.type\": \"bytes\",\n" + + " \"connect.version\": 1,\n" + + " \"title\": \"org.apache.kafka.connect.data.Decimal\",\n" + + " \"type\": \"number\"\n" + + " }\n" + + " ]\n" + + " }\n" + + " },\n" + + " \"type\": \"object\"\n" + + "}")); + + // When: + final byte[] bytes = serializer.serialize("t", + ImmutableMap.of("B", new BigDecimal("10.0"))); + + // Then: + assertThat(deserializer.deserialize("t", bytes), + is(ImmutableMap.of("B", new BigDecimal("10.0")))); + } +} diff --git a/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/serde/json/ValueSpecJsonSerdeSupplierTest.java b/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/serde/json/ValueSpecJsonSerdeSupplierTest.java index eb46615a31f6..80a99ad3d6ec 100644 --- a/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/serde/json/ValueSpecJsonSerdeSupplierTest.java +++ b/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/serde/json/ValueSpecJsonSerdeSupplierTest.java @@ -21,8 +21,6 @@ import com.google.common.collect.ImmutableMap; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; -import io.confluent.ksql.serde.json.JsonSerdeUtils; -import io.confluent.ksql.test.tools.TestJsonMapper; import java.math.BigDecimal; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serializer; @@ -39,12 +37,10 @@ public class ValueSpecJsonSerdeSupplierTest { private SchemaRegistryClient srClient; private ValueSpecJsonSerdeSupplier plainSerde; - private ValueSpecJsonSerdeSupplier srSerde; @Before public void setUp() { - plainSerde = new ValueSpecJsonSerdeSupplier(false, ImmutableMap.of()); - srSerde = new ValueSpecJsonSerdeSupplier(true, ImmutableMap.of()); + plainSerde = new ValueSpecJsonSerdeSupplier(ImmutableMap.of()); } @Test @@ -72,32 +68,4 @@ public void shouldDeserializeDecimalsWithoutStrippingTrailingZeros_Plain() { // Then: assertThat(result, is(new BigDecimal("10.0"))); } - - @Test - public void shouldSerializeDecimalsWithOutStrippingTrailingZeros_Sr() throws Exception { - // Given: - final Serializer serializer = srSerde.getSerializer(srClient, false); - - // When: - final byte[] bytes = serializer.serialize("t", new BigDecimal("10.0")); - - // Then: - assertThat(JsonSerdeUtils.readJsonSR(bytes, TestJsonMapper.INSTANCE.get(), String.class), is("10.0")); - } - - @Test - public void shouldDeserializeDecimalsWithoutStrippingTrailingZeros_Sr() { - // Given: - final Deserializer deserializer = srSerde.getDeserializer(srClient, false); - - final byte[] jsonBytes = "10.0".getBytes(UTF_8); - final byte[] bytes = new byte[jsonBytes.length + JsonSerdeUtils.SIZE_OF_SR_PREFIX]; - System.arraycopy(jsonBytes, 0, bytes, JsonSerdeUtils.SIZE_OF_SR_PREFIX, jsonBytes.length); - - // When: - final Object result = deserializer.deserialize("t", bytes); - - // Then: - assertThat(result, is(new BigDecimal("10.0"))); - } } \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/tools/TestExecutorTest.java b/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/tools/TestExecutorTest.java index a6f9ec48b27e..06b8b823c9b6 100644 --- a/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/tools/TestExecutorTest.java +++ b/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/tools/TestExecutorTest.java @@ -574,7 +574,7 @@ private static ProducerRecord producerRecord( final List
headers ) { final byte[] serializedKey = keySerializer.serialize("", key); - final byte[] serializeValue = new ValueSpecJsonSerdeSupplier(false, ImmutableMap.of()) + final byte[] serializeValue = new ValueSpecJsonSerdeSupplier(ImmutableMap.of()) .getSerializer(null, false) .serialize("", value); diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_extra_optional_field_with_schema_schema_id_in_csas_OK_-_JSON_SR/7.2.0_1638945243141/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_extra_optional_field_with_schema_schema_id_in_csas_OK_-_JSON_SR/7.2.0_1638945243141/spec.json index 5e19474f110f..20bb98566dac 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_extra_optional_field_with_schema_schema_id_in_csas_OK_-_JSON_SR/7.2.0_1638945243141/spec.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_extra_optional_field_with_schema_schema_id_in_csas_OK_-_JSON_SR/7.2.0_1638945243141/spec.json @@ -50,7 +50,8 @@ "id" : 42 }, "value" : { - "c1" : 4 + "c1" : 4, + "c2" : null } } ], "topics" : [ { diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_schema_id_without_elements_OK_-_JSON_SR_SCHEMA/7.1.0_1636006774347/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_schema_id_without_elements_OK_-_JSON_SR_SCHEMA/7.1.0_1636006774347/spec.json index 05dbbb8a2657..2e43ed4619e1 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_schema_id_without_elements_OK_-_JSON_SR_SCHEMA/7.1.0_1636006774347/spec.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_schema_id_without_elements_OK_-_JSON_SR_SCHEMA/7.1.0_1636006774347/spec.json @@ -44,7 +44,9 @@ "type" : "object", "properties" : { "c1" : { - "type" : "integer" + "type" : "integer", + "connect.index": 0, + "connect.type": "int64" } } }, @@ -95,7 +97,9 @@ "type" : "object", "properties" : { "c1" : { - "type" : "integer" + "type" : "integer", + "connect.index": 0, + "connect.type": "int64" } } } diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_schema_id_without_elements_OK_-_JSON_SR_SCHEMA/7.2.0_1638433521594/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_schema_id_without_elements_OK_-_JSON_SR_SCHEMA/7.2.0_1638433521594/spec.json index 1816d09d756b..19f4933b9d9b 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_schema_id_without_elements_OK_-_JSON_SR_SCHEMA/7.2.0_1638433521594/spec.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_schema_id_without_elements_OK_-_JSON_SR_SCHEMA/7.2.0_1638433521594/spec.json @@ -47,7 +47,9 @@ "type" : "object", "properties" : { "c1" : { - "type" : "integer" + "type" : "integer", + "connect.index": 0, + "connect.type": "int64" } } }, @@ -101,7 +103,9 @@ "type" : "object", "properties" : { "c1" : { - "type" : "integer" + "type" : "integer", + "connect.index": 0, + "connect.type": "int64" } } } diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_schema_id_without_key_value_elements_OK_-_JSON_SR_SCHEMA/7.1.0_1636006774568/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_schema_id_without_key_value_elements_OK_-_JSON_SR_SCHEMA/7.1.0_1636006774568/spec.json index f51066c52de3..168a110795a8 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_schema_id_without_key_value_elements_OK_-_JSON_SR_SCHEMA/7.1.0_1636006774568/spec.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_schema_id_without_key_value_elements_OK_-_JSON_SR_SCHEMA/7.1.0_1636006774568/spec.json @@ -54,7 +54,9 @@ "type" : "object", "properties" : { "c1" : { - "type" : "integer" + "type" : "integer", + "connect.index": 0, + "connect.type": "int64" } } }, @@ -115,7 +117,9 @@ "type" : "object", "properties" : { "c1" : { - "type" : "integer" + "type" : "integer", + "connect.index": 0, + "connect.type": "int64" } } } diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_schema_id_without_key_value_elements_OK_-_JSON_SR_SCHEMA/7.2.0_1638433521844/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_schema_id_without_key_value_elements_OK_-_JSON_SR_SCHEMA/7.2.0_1638433521844/spec.json index e8ef5b72acdd..7e9417a5b30a 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_schema_id_without_key_value_elements_OK_-_JSON_SR_SCHEMA/7.2.0_1638433521844/spec.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_schema_id_without_key_value_elements_OK_-_JSON_SR_SCHEMA/7.2.0_1638433521844/spec.json @@ -60,7 +60,9 @@ "type" : "object", "properties" : { "c1" : { - "type" : "integer" + "type" : "integer", + "connect.index": 0, + "connect.type": "int64" } } }, @@ -127,7 +129,9 @@ "type" : "object", "properties" : { "c1" : { - "type" : "integer" + "type" : "integer", + "connect.index": 0, + "connect.type": "int64" } } } diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_schema_id_without_value_elements_OK_-_JSON_SR_SCHEMA/7.1.0_1636006774543/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_schema_id_without_value_elements_OK_-_JSON_SR_SCHEMA/7.1.0_1636006774543/spec.json index 0ff7a00d0ef7..9d8df0a3d839 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_schema_id_without_value_elements_OK_-_JSON_SR_SCHEMA/7.1.0_1636006774543/spec.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_schema_id_without_value_elements_OK_-_JSON_SR_SCHEMA/7.1.0_1636006774543/spec.json @@ -44,7 +44,9 @@ "type" : "object", "properties" : { "c1" : { - "type" : "integer" + "type" : "integer", + "connect.index": 0, + "connect.type": "int64" } } }, @@ -95,7 +97,9 @@ "type" : "object", "properties" : { "c1" : { - "type" : "integer" + "type" : "integer", + "connect.index": 0, + "connect.type": "int64" } } } diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_schema_id_without_value_elements_OK_-_JSON_SR_SCHEMA/7.2.0_1638433521820/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_schema_id_without_value_elements_OK_-_JSON_SR_SCHEMA/7.2.0_1638433521820/spec.json index d3df5ae3a02b..925a62946be9 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_schema_id_without_value_elements_OK_-_JSON_SR_SCHEMA/7.2.0_1638433521820/spec.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_schema_id_without_value_elements_OK_-_JSON_SR_SCHEMA/7.2.0_1638433521820/spec.json @@ -47,7 +47,9 @@ "type" : "object", "properties" : { "c1" : { - "type" : "integer" + "type" : "integer", + "connect.index": 0, + "connect.type": "int64" } } }, @@ -101,7 +103,9 @@ "type" : "object", "properties" : { "c1" : { - "type" : "integer" + "type" : "integer", + "connect.index": 0, + "connect.type": "int64" } } } diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_schema_id_without_value_elements_OK_-_custom_key_name/7.1.0_1636006774633/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_schema_id_without_value_elements_OK_-_custom_key_name/7.1.0_1636006774633/spec.json index 7c1fda7b6d58..2673c1b7197e 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_schema_id_without_value_elements_OK_-_custom_key_name/7.1.0_1636006774633/spec.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_schema_id_without_value_elements_OK_-_custom_key_name/7.1.0_1636006774633/spec.json @@ -44,7 +44,9 @@ "type" : "object", "properties" : { "c1" : { - "type" : "integer" + "type" : "integer", + "connect.index": 0, + "connect.type": "int64" } } }, @@ -95,7 +97,9 @@ "type" : "object", "properties" : { "c1" : { - "type" : "integer" + "type" : "integer", + "connect.index": 0, + "connect.type": "int64" } } } diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_schema_id_without_value_elements_OK_-_custom_key_name/7.2.0_1638433521919/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_schema_id_without_value_elements_OK_-_custom_key_name/7.2.0_1638433521919/spec.json index 76db3df57936..219eafeabeae 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_schema_id_without_value_elements_OK_-_custom_key_name/7.2.0_1638433521919/spec.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_schema_id_without_value_elements_OK_-_custom_key_name/7.2.0_1638433521919/spec.json @@ -47,7 +47,9 @@ "type" : "object", "properties" : { "c1" : { - "type" : "integer" + "type" : "integer", + "connect.index": 0, + "connect.type": "int64" } } }, @@ -101,7 +103,9 @@ "type" : "object", "properties" : { "c1" : { - "type" : "integer" + "type" : "integer", + "connect.index": 0, + "connect.type": "int64" } } } diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_elements_OK_-_JSON_SR_SCHEMA/6.0.0_1584430907534/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_elements_OK_-_JSON_SR_SCHEMA/6.0.0_1584430907534/spec.json index b8e7d060ecc2..b445f707da8a 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_elements_OK_-_JSON_SR_SCHEMA/6.0.0_1584430907534/spec.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_elements_OK_-_JSON_SR_SCHEMA/6.0.0_1584430907534/spec.json @@ -44,7 +44,9 @@ "type" : "object", "properties" : { "c1" : { - "type" : "integer" + "type" : "integer", + "connect.index": 0, + "connect.type": "int64" } } }, @@ -93,7 +95,9 @@ "type" : "object", "properties" : { "c1" : { - "type" : "integer" + "type" : "integer", + "connect.index": 0, + "connect.type": "int64" } } } diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_elements_OK_-_JSON_SR_SCHEMA/6.0.0_1589910857339/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_elements_OK_-_JSON_SR_SCHEMA/6.0.0_1589910857339/spec.json index d8edb2077d8e..eaf0ad221852 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_elements_OK_-_JSON_SR_SCHEMA/6.0.0_1589910857339/spec.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_elements_OK_-_JSON_SR_SCHEMA/6.0.0_1589910857339/spec.json @@ -44,7 +44,9 @@ "type" : "object", "properties" : { "c1" : { - "type" : "integer" + "type" : "integer", + "connect.index": 0, + "connect.type": "int64" } } }, @@ -93,7 +95,9 @@ "type" : "object", "properties" : { "c1" : { - "type" : "integer" + "type" : "integer", + "connect.index": 0, + "connect.type": "int64" } } } diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_elements_OK_-_JSON_SR_SCHEMA/6.0.0_1591141808076/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_elements_OK_-_JSON_SR_SCHEMA/6.0.0_1591141808076/spec.json index 72bfb2838312..cb7621fa8ea8 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_elements_OK_-_JSON_SR_SCHEMA/6.0.0_1591141808076/spec.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_elements_OK_-_JSON_SR_SCHEMA/6.0.0_1591141808076/spec.json @@ -44,7 +44,9 @@ "type" : "object", "properties" : { "c1" : { - "type" : "integer" + "type" : "integer", + "connect.index": 0, + "connect.type": "int64" } } }, @@ -93,7 +95,9 @@ "type" : "object", "properties" : { "c1" : { - "type" : "integer" + "type" : "integer", + "connect.index": 0, + "connect.type": "int64" } } } diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_elements_OK_-_JSON_SR_SCHEMA/7.1.0_1633125924997/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_elements_OK_-_JSON_SR_SCHEMA/7.1.0_1633125924997/spec.json index 54436098c004..f9fb904cf882 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_elements_OK_-_JSON_SR_SCHEMA/7.1.0_1633125924997/spec.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_elements_OK_-_JSON_SR_SCHEMA/7.1.0_1633125924997/spec.json @@ -44,7 +44,9 @@ "type" : "object", "properties" : { "c1" : { - "type" : "integer" + "type" : "integer", + "connect.index": 0, + "connect.type": "int64" } } }, @@ -95,7 +97,9 @@ "type" : "object", "properties" : { "c1" : { - "type" : "integer" + "type" : "integer", + "connect.index": 0, + "connect.type": "int64" } } } diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_value_elements_OK_-_JSON_SR_SCHEMA/6.0.0_1584430907668/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_value_elements_OK_-_JSON_SR_SCHEMA/6.0.0_1584430907668/spec.json index c6e512a99f49..400f4e9d1aeb 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_value_elements_OK_-_JSON_SR_SCHEMA/6.0.0_1584430907668/spec.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_value_elements_OK_-_JSON_SR_SCHEMA/6.0.0_1584430907668/spec.json @@ -44,7 +44,9 @@ "type" : "object", "properties" : { "c1" : { - "type" : "integer" + "type" : "integer", + "connect.index": 0, + "connect.type": "int64" } } }, @@ -93,7 +95,9 @@ "type" : "object", "properties" : { "c1" : { - "type" : "integer" + "type" : "integer", + "connect.index": 0, + "connect.type": "int64" } } } diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_value_elements_OK_-_JSON_SR_SCHEMA/6.0.0_1589910857489/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_value_elements_OK_-_JSON_SR_SCHEMA/6.0.0_1589910857489/spec.json index b313d46daf2f..c3ad347159c3 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_value_elements_OK_-_JSON_SR_SCHEMA/6.0.0_1589910857489/spec.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_value_elements_OK_-_JSON_SR_SCHEMA/6.0.0_1589910857489/spec.json @@ -44,7 +44,9 @@ "type" : "object", "properties" : { "c1" : { - "type" : "integer" + "type" : "integer", + "connect.index": 0, + "connect.type": "int64" } } }, @@ -93,7 +95,9 @@ "type" : "object", "properties" : { "c1" : { - "type" : "integer" + "type" : "integer", + "connect.index": 0, + "connect.type": "int64" } } } diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_value_elements_OK_-_JSON_SR_SCHEMA/7.1.0_1633125925077/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_value_elements_OK_-_JSON_SR_SCHEMA/7.1.0_1633125925077/spec.json index 7511bdd16b1b..44747c68dd2e 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_value_elements_OK_-_JSON_SR_SCHEMA/7.1.0_1633125925077/spec.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_value_elements_OK_-_JSON_SR_SCHEMA/7.1.0_1633125925077/spec.json @@ -44,7 +44,9 @@ "type" : "object", "properties" : { "c1" : { - "type" : "integer" + "type" : "integer", + "connect.index": 0, + "connect.type": "int64" } } }, @@ -95,7 +97,9 @@ "type" : "object", "properties" : { "c1" : { - "type" : "integer" + "type" : "integer", + "connect.index": 0, + "connect.type": "int64" } } } diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_value_elements_OK_-_custom_key_name/6.0.0_1588901204000/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_value_elements_OK_-_custom_key_name/6.0.0_1588901204000/spec.json index b4996c0b6c5b..d352ed48b054 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_value_elements_OK_-_custom_key_name/6.0.0_1588901204000/spec.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_value_elements_OK_-_custom_key_name/6.0.0_1588901204000/spec.json @@ -44,7 +44,9 @@ "type" : "object", "properties" : { "c1" : { - "type" : "integer" + "type" : "integer", + "connect.index": 0, + "connect.type": "int64" } } }, @@ -93,7 +95,9 @@ "type" : "object", "properties" : { "c1" : { - "type" : "integer" + "type" : "integer", + "connect.index": 0, + "connect.type": "int64" } } } diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_value_elements_OK_-_custom_key_name/6.0.0_1589910857511/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_value_elements_OK_-_custom_key_name/6.0.0_1589910857511/spec.json index 8983ab94c5b5..0587ad00a977 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_value_elements_OK_-_custom_key_name/6.0.0_1589910857511/spec.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_value_elements_OK_-_custom_key_name/6.0.0_1589910857511/spec.json @@ -44,7 +44,9 @@ "type" : "object", "properties" : { "c1" : { - "type" : "integer" + "type" : "integer", + "connect.index": 0, + "connect.type": "int64" } } }, @@ -93,7 +95,9 @@ "type" : "object", "properties" : { "c1" : { - "type" : "integer" + "type" : "integer", + "connect.index": 0, + "connect.type": "int64" } } } diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_value_elements_OK_-_custom_key_name/7.1.0_1633125925093/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_value_elements_OK_-_custom_key_name/7.1.0_1633125925093/spec.json index a807af8d656c..6840e82cd153 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_value_elements_OK_-_custom_key_name/7.1.0_1633125925093/spec.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_value_elements_OK_-_custom_key_name/7.1.0_1633125925093/spec.json @@ -44,7 +44,9 @@ "type" : "object", "properties" : { "c1" : { - "type" : "integer" + "type" : "integer", + "connect.index": 0, + "connect.type": "int64" } } }, @@ -95,7 +97,9 @@ "type" : "object", "properties" : { "c1" : { - "type" : "integer" + "type" : "integer", + "connect.index": 0, + "connect.type": "int64" } } } diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/joins_-_stream-stream_key-to-key_-_SR-enabled_key_and_value_format_-_with_inference_-_different_key_schemas/6.2.0_1606799813622/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/joins_-_stream-stream_key-to-key_-_SR-enabled_key_and_value_format_-_with_inference_-_different_key_schemas/6.2.0_1606799813622/spec.json index de56251486c8..5780a7bf2d08 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/joins_-_stream-stream_key-to-key_-_SR-enabled_key_and_value_format_-_with_inference_-_different_key_schemas/6.2.0_1606799813622/spec.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/joins_-_stream-stream_key-to-key_-_SR-enabled_key_and_value_format_-_with_inference_-_different_key_schemas/6.2.0_1606799813622/spec.json @@ -127,8 +127,7 @@ "oneOf" : [ { "type" : "null" }, { - "type" : "string", - "connect.type" : "int32" + "type" : "string" } ] } } @@ -247,8 +246,7 @@ "oneOf" : [ { "type" : "null" }, { - "type" : "string", - "connect.type" : "int32" + "type" : "string" } ] } } diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/joins_-_stream-stream_key-to-key_-_SR-enabled_key_and_value_format_-_with_inference_-_different_key_schemas/7.1.0_1633125991945/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/joins_-_stream-stream_key-to-key_-_SR-enabled_key_and_value_format_-_with_inference_-_different_key_schemas/7.1.0_1633125991945/spec.json index 8f3c4d6419bc..c5ca290946db 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/joins_-_stream-stream_key-to-key_-_SR-enabled_key_and_value_format_-_with_inference_-_different_key_schemas/7.1.0_1633125991945/spec.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/joins_-_stream-stream_key-to-key_-_SR-enabled_key_and_value_format_-_with_inference_-_different_key_schemas/7.1.0_1633125991945/spec.json @@ -127,8 +127,7 @@ "oneOf" : [ { "type" : "null" }, { - "type" : "string", - "connect.type" : "int32" + "type" : "string" } ] } } @@ -268,8 +267,7 @@ "oneOf" : [ { "type" : "null" }, { - "type" : "string", - "connect.type" : "int32" + "type" : "string" } ] } } diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_ARRAY_-_key_-_inference/6.2.0_1607115574018/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_ARRAY_-_key_-_inference/6.2.0_1607115574018/spec.json index 88eb0f9708ff..ddb14dd88e48 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_ARRAY_-_key_-_inference/6.2.0_1607115574018/spec.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_ARRAY_-_key_-_inference/6.2.0_1607115574018/spec.json @@ -87,7 +87,7 @@ "valueSchema" : { "type" : "object", "properties" : { - "COL1" : { + "FOO" : { "connect.index" : 0, "oneOf" : [ { "type" : "null" @@ -154,7 +154,7 @@ "valueSchema" : { "type" : "object", "properties" : { - "COL1" : { + "FOO" : { "connect.index" : 0, "oneOf" : [ { "type" : "null" diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_ARRAY_-_key_-_inference/7.1.0_1633125996121/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_ARRAY_-_key_-_inference/7.1.0_1633125996121/spec.json index b6fea8a7c1c5..2e002341e54e 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_ARRAY_-_key_-_inference/7.1.0_1633125996121/spec.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_ARRAY_-_key_-_inference/7.1.0_1633125996121/spec.json @@ -87,7 +87,7 @@ "valueSchema" : { "type" : "object", "properties" : { - "COL1" : { + "FOO" : { "connect.index" : 0, "oneOf" : [ { "type" : "null" @@ -156,7 +156,7 @@ "valueSchema" : { "type" : "object", "properties" : { - "COL1" : { + "FOO" : { "connect.index" : 0, "oneOf" : [ { "type" : "null" diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_BIGINT_-_key_-_inference/6.2.0_1606796984850/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_BIGINT_-_key_-_inference/6.2.0_1606796984850/spec.json index 200a4003781c..f9da77d9f386 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_BIGINT_-_key_-_inference/6.2.0_1606796984850/spec.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_BIGINT_-_key_-_inference/6.2.0_1606796984850/spec.json @@ -69,7 +69,7 @@ "valueSchema" : { "type" : "object", "properties" : { - "COL1" : { + "FOO" : { "connect.index" : 0, "oneOf" : [ { "type" : "null" @@ -130,7 +130,7 @@ "valueSchema" : { "type" : "object", "properties" : { - "COL1" : { + "FOO" : { "connect.index" : 0, "oneOf" : [ { "type" : "null" diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_BIGINT_-_key_-_inference/7.1.0_1633125995943/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_BIGINT_-_key_-_inference/7.1.0_1633125995943/spec.json index 0131391f57ce..fab42ce7ca35 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_BIGINT_-_key_-_inference/7.1.0_1633125995943/spec.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_BIGINT_-_key_-_inference/7.1.0_1633125995943/spec.json @@ -69,7 +69,7 @@ "valueSchema" : { "type" : "object", "properties" : { - "COL1" : { + "FOO" : { "connect.index" : 0, "oneOf" : [ { "type" : "null" @@ -132,7 +132,7 @@ "valueSchema" : { "type" : "object", "properties" : { - "COL1" : { + "FOO" : { "connect.index" : 0, "oneOf" : [ { "type" : "null" diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_BOOLEAN_-_key_-_inference/6.2.0_1606796984734/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_BOOLEAN_-_key_-_inference/6.2.0_1606796984734/spec.json index e00ce873706e..33903ef0a1d0 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_BOOLEAN_-_key_-_inference/6.2.0_1606796984734/spec.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_BOOLEAN_-_key_-_inference/6.2.0_1606796984734/spec.json @@ -58,14 +58,13 @@ "oneOf" : [ { "type" : "null" }, { - "type" : "boolean", - "connect.type" : "boolean" + "type" : "boolean" } ] }, "valueSchema" : { "type" : "object", "properties" : { - "COL1" : { + "FOO" : { "connect.index" : 0, "oneOf" : [ { "type" : "null" @@ -119,14 +118,13 @@ "oneOf" : [ { "type" : "null" }, { - "type" : "boolean", - "connect.type" : "boolean" + "type" : "boolean" } ] }, "valueSchema" : { "type" : "object", "properties" : { - "COL1" : { + "FOO" : { "connect.index" : 0, "oneOf" : [ { "type" : "null" diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_BOOLEAN_-_key_-_inference/7.1.0_1633125995850/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_BOOLEAN_-_key_-_inference/7.1.0_1633125995850/spec.json index 0b2a77b7c189..bd022d47962e 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_BOOLEAN_-_key_-_inference/7.1.0_1633125995850/spec.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_BOOLEAN_-_key_-_inference/7.1.0_1633125995850/spec.json @@ -58,14 +58,13 @@ "oneOf" : [ { "type" : "null" }, { - "type" : "boolean", - "connect.type" : "boolean" + "type" : "boolean" } ] }, "valueSchema" : { "type" : "object", "properties" : { - "COL1" : { + "FOO" : { "connect.index" : 0, "oneOf" : [ { "type" : "null" @@ -121,14 +120,13 @@ "oneOf" : [ { "type" : "null" }, { - "type" : "boolean", - "connect.type" : "boolean" + "type" : "boolean" } ] }, "valueSchema" : { "type" : "object", "properties" : { - "COL1" : { + "FOO" : { "connect.index" : 0, "oneOf" : [ { "type" : "null" diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_DECIMAL_-_key_-_inference/6.2.0_1606796985191/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_DECIMAL_-_key_-_inference/6.2.0_1606796985191/spec.json index 18905ca51c90..b78feec2bbae 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_DECIMAL_-_key_-_inference/6.2.0_1606796985191/spec.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_DECIMAL_-_key_-_inference/6.2.0_1606796985191/spec.json @@ -75,7 +75,7 @@ "valueSchema" : { "type" : "object", "properties" : { - "COL1" : { + "FOO" : { "connect.index" : 0, "oneOf" : [ { "type" : "null" @@ -142,7 +142,7 @@ "valueSchema" : { "type" : "object", "properties" : { - "COL1" : { + "FOO" : { "connect.index" : 0, "oneOf" : [ { "type" : "null" diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_DECIMAL_-_key_-_inference/7.1.0_1633125996070/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_DECIMAL_-_key_-_inference/7.1.0_1633125996070/spec.json index 40162130550d..e96df1a91880 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_DECIMAL_-_key_-_inference/7.1.0_1633125996070/spec.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_DECIMAL_-_key_-_inference/7.1.0_1633125996070/spec.json @@ -75,7 +75,7 @@ "valueSchema" : { "type" : "object", "properties" : { - "COL1" : { + "FOO" : { "connect.index" : 0, "oneOf" : [ { "type" : "null" @@ -144,7 +144,7 @@ "valueSchema" : { "type" : "object", "properties" : { - "COL1" : { + "FOO" : { "connect.index" : 0, "oneOf" : [ { "type" : "null" diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_DOUBLE_-_key_-_inference/6.2.0_1606796984964/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_DOUBLE_-_key_-_inference/6.2.0_1606796984964/spec.json index df552419887f..a2e70d2adc3f 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_DOUBLE_-_key_-_inference/6.2.0_1606796984964/spec.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_DOUBLE_-_key_-_inference/6.2.0_1606796984964/spec.json @@ -69,7 +69,7 @@ "valueSchema" : { "type" : "object", "properties" : { - "COL1" : { + "FOO" : { "connect.index" : 0, "oneOf" : [ { "type" : "null" @@ -130,7 +130,7 @@ "valueSchema" : { "type" : "object", "properties" : { - "COL1" : { + "FOO" : { "connect.index" : 0, "oneOf" : [ { "type" : "null" diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_DOUBLE_-_key_-_inference/7.1.0_1633125995987/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_DOUBLE_-_key_-_inference/7.1.0_1633125995987/spec.json index 6848eb88d805..45542d84da84 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_DOUBLE_-_key_-_inference/7.1.0_1633125995987/spec.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_DOUBLE_-_key_-_inference/7.1.0_1633125995987/spec.json @@ -69,7 +69,7 @@ "valueSchema" : { "type" : "object", "properties" : { - "COL1" : { + "FOO" : { "connect.index" : 0, "oneOf" : [ { "type" : "null" @@ -132,7 +132,7 @@ "valueSchema" : { "type" : "object", "properties" : { - "COL1" : { + "FOO" : { "connect.index" : 0, "oneOf" : [ { "type" : "null" diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_INT_-_key_-_inference/6.2.0_1606796984805/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_INT_-_key_-_inference/6.2.0_1606796984805/spec.json index 3475f2623d94..01ca5dc6e36e 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_INT_-_key_-_inference/6.2.0_1606796984805/spec.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_INT_-_key_-_inference/6.2.0_1606796984805/spec.json @@ -81,7 +81,7 @@ "valueSchema" : { "type" : "object", "properties" : { - "COL1" : { + "FOO" : { "connect.index" : 0, "oneOf" : [ { "type" : "null" @@ -142,7 +142,7 @@ "valueSchema" : { "type" : "object", "properties" : { - "COL1" : { + "FOO" : { "connect.index" : 0, "oneOf" : [ { "type" : "null" diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_INT_-_key_-_inference/7.1.0_1633125995896/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_INT_-_key_-_inference/7.1.0_1633125995896/spec.json index 1d32426bfcd9..72ebcb586d26 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_INT_-_key_-_inference/7.1.0_1633125995896/spec.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_INT_-_key_-_inference/7.1.0_1633125995896/spec.json @@ -81,7 +81,7 @@ "valueSchema" : { "type" : "object", "properties" : { - "COL1" : { + "FOO" : { "connect.index" : 0, "oneOf" : [ { "type" : "null" @@ -144,7 +144,7 @@ "valueSchema" : { "type" : "object", "properties" : { - "COL1" : { + "FOO" : { "connect.index" : 0, "oneOf" : [ { "type" : "null" diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_STRING_-_key_-_inference/6.2.0_1606796985074/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_STRING_-_key_-_inference/6.2.0_1606796985074/spec.json index fade13e31bb6..a975d18604bb 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_STRING_-_key_-_inference/6.2.0_1606796985074/spec.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_STRING_-_key_-_inference/6.2.0_1606796985074/spec.json @@ -62,14 +62,13 @@ "oneOf" : [ { "type" : "null" }, { - "type" : "string", - "connect.type" : "string" + "type" : "string" } ] }, "valueSchema" : { "type" : "object", "properties" : { - "COL1" : { + "FOO" : { "connect.index" : 0, "oneOf" : [ { "type" : "null" @@ -123,14 +122,13 @@ "oneOf" : [ { "type" : "null" }, { - "type" : "string", - "connect.type" : "string" + "type" : "string" } ] }, "valueSchema" : { "type" : "object", "properties" : { - "COL1" : { + "FOO" : { "connect.index" : 0, "oneOf" : [ { "type" : "null" diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_STRING_-_key_-_inference/7.1.0_1633125996028/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_STRING_-_key_-_inference/7.1.0_1633125996028/spec.json index a785f624992d..7643174fdb22 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_STRING_-_key_-_inference/7.1.0_1633125996028/spec.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_STRING_-_key_-_inference/7.1.0_1633125996028/spec.json @@ -62,14 +62,13 @@ "oneOf" : [ { "type" : "null" }, { - "type" : "string", - "connect.type" : "string" + "type" : "string" } ] }, "valueSchema" : { "type" : "object", "properties" : { - "COL1" : { + "FOO" : { "connect.index" : 0, "oneOf" : [ { "type" : "null" @@ -125,14 +124,13 @@ "oneOf" : [ { "type" : "null" }, { - "type" : "string", - "connect.type" : "string" + "type" : "string" } ] }, "valueSchema" : { "type" : "object", "properties" : { - "COL1" : { + "FOO" : { "connect.index" : 0, "oneOf" : [ { "type" : "null" diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_STRUCT_-_key_-_inference/6.2.0_1607115574065/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_STRUCT_-_key_-_inference/6.2.0_1607115574065/spec.json index 7f5cfa71e2d1..a1b876e88a65 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_STRUCT_-_key_-_inference/6.2.0_1607115574065/spec.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_STRUCT_-_key_-_inference/6.2.0_1607115574065/spec.json @@ -86,8 +86,7 @@ "oneOf" : [ { "type" : "null" }, { - "type" : "string", - "connect.type" : "string" + "type" : "string" } ] } } @@ -95,7 +94,7 @@ "valueSchema" : { "type" : "object", "properties" : { - "COL1" : { + "FOO" : { "connect.index" : 0, "oneOf" : [ { "type" : "null" @@ -153,8 +152,7 @@ "oneOf" : [ { "type" : "null" }, { - "type" : "string", - "connect.type" : "string" + "type" : "string" } ] } } @@ -162,7 +160,7 @@ "valueSchema" : { "type" : "object", "properties" : { - "COL1" : { + "FOO" : { "connect.index" : 0, "oneOf" : [ { "type" : "null" diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_STRUCT_-_key_-_inference/7.1.0_1633125996163/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_STRUCT_-_key_-_inference/7.1.0_1633125996163/spec.json index e4d6ab12b774..4163ff81ebf4 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_STRUCT_-_key_-_inference/7.1.0_1633125996163/spec.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_STRUCT_-_key_-_inference/7.1.0_1633125996163/spec.json @@ -86,8 +86,7 @@ "oneOf" : [ { "type" : "null" }, { - "type" : "string", - "connect.type" : "string" + "type" : "string" } ] } } @@ -95,7 +94,7 @@ "valueSchema" : { "type" : "object", "properties" : { - "COL1" : { + "FOO" : { "connect.index" : 0, "oneOf" : [ { "type" : "null" @@ -155,8 +154,7 @@ "oneOf" : [ { "type" : "null" }, { - "type" : "string", - "connect.type" : "string" + "type" : "string" } ] } } @@ -164,7 +162,7 @@ "valueSchema" : { "type" : "object", "properties" : { - "COL1" : { + "FOO" : { "connect.index" : 0, "oneOf" : [ { "type" : "null" diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_deserialize_anonymous_array_-_value/6.1.0_1597874026054/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_deserialize_anonymous_array_-_value/6.1.0_1597874026054/spec.json index ce5ae60f6515..e3a31e4430b5 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_deserialize_anonymous_array_-_value/6.1.0_1597874026054/spec.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_deserialize_anonymous_array_-_value/6.1.0_1597874026054/spec.json @@ -62,24 +62,18 @@ }, { "name" : "input_topic", "valueSchema" : { - "type" : "object", - "properties" : { - "FOO" : { - "connect.index" : 0, + "oneOf" : [ { + "type" : "null" + }, { + "type" : "array", + "items" : { "oneOf" : [ { "type" : "null" }, { - "type" : "array", - "items" : { - "oneOf" : [ { - "type" : "null" - }, { - "type" : "string" - } ] - } + "type" : "string" } ] } - } + } ] }, "valueFormat" : "JSON", "replicas" : 1, @@ -120,24 +114,18 @@ }, "partitions" : 4, "valueSchema" : { - "type" : "object", - "properties" : { - "FOO" : { - "connect.index" : 0, + "oneOf" : [ { + "type" : "null" + }, { + "type" : "array", + "items" : { "oneOf" : [ { "type" : "null" }, { - "type" : "array", - "items" : { - "oneOf" : [ { - "type" : "null" - }, { - "type" : "string" - } ] - } + "type" : "string" } ] } - } + } ] } }, { "name" : "OUTPUT", diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_deserialize_anonymous_array_-_value_-_with_coercion/6.1.0_1597874026069/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_deserialize_anonymous_array_-_value_-_with_coercion/6.1.0_1597874026069/spec.json index a65c1e9adddb..00d908a2a974 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_deserialize_anonymous_array_-_value_-_with_coercion/6.1.0_1597874026069/spec.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_deserialize_anonymous_array_-_value_-_with_coercion/6.1.0_1597874026069/spec.json @@ -44,24 +44,18 @@ }, { "name" : "input_topic", "valueSchema" : { - "type" : "object", - "properties" : { - "FOO" : { - "connect.index" : 0, + "oneOf" : [ { + "type" : "null" + }, { + "type" : "array", + "items" : { "oneOf" : [ { "type" : "null" }, { - "type" : "array", - "items" : { - "oneOf" : [ { - "type" : "null" - }, { - "type" : "string" - } ] - } + "type" : "string" } ] } - } + } ] }, "valueFormat" : "JSON", "replicas" : 1, @@ -102,24 +96,18 @@ }, "partitions" : 4, "valueSchema" : { - "type" : "object", - "properties" : { - "FOO" : { - "connect.index" : 0, + "oneOf" : [ { + "type" : "null" + }, { + "type" : "array", + "items" : { "oneOf" : [ { "type" : "null" }, { - "type" : "array", - "items" : { - "oneOf" : [ { - "type" : "null" - }, { - "type" : "string" - } ] - } + "type" : "string" } ] } - } + } ] } }, { "name" : "OUTPUT", diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_deserialize_anonymous_primitive_-_value/6.1.0_1597874026009/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_deserialize_anonymous_primitive_-_value/6.1.0_1597874026009/spec.json index 1d1f23cfba9c..862a55279b28 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_deserialize_anonymous_primitive_-_value/6.1.0_1597874026009/spec.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_deserialize_anonymous_primitive_-_value/6.1.0_1597874026009/spec.json @@ -52,18 +52,12 @@ }, { "name" : "input_topic", "valueSchema" : { - "type" : "object", - "properties" : { - "FOO" : { - "connect.index" : 0, - "oneOf" : [ { - "type" : "null" - }, { - "type" : "integer", - "connect.type" : "int32" - } ] - } - } + "oneOf" : [ { + "type" : "null" + }, { + "type" : "integer", + "connect.type" : "int32" + } ] }, "valueFormat" : "JSON", "replicas" : 1, @@ -104,18 +98,12 @@ }, "partitions" : 4, "valueSchema" : { - "type" : "object", - "properties" : { - "FOO" : { - "connect.index" : 0, - "oneOf" : [ { - "type" : "null" - }, { - "type" : "integer", - "connect.type" : "int32" - } ] - } - } + "oneOf" : [ { + "type" : "null" + }, { + "type" : "integer", + "connect.type" : "int32" + } ] } }, { "name" : "OUTPUT", diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_deserialize_anonymous_primitive_-_value_-_with_coercion/6.1.0_1597874026024/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_deserialize_anonymous_primitive_-_value_-_with_coercion/6.1.0_1597874026024/spec.json index 5c629c66ac17..a861e6cc13cb 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_deserialize_anonymous_primitive_-_value_-_with_coercion/6.1.0_1597874026024/spec.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/json_sr_-_deserialize_anonymous_primitive_-_value_-_with_coercion/6.1.0_1597874026024/spec.json @@ -44,17 +44,11 @@ }, { "name" : "input_topic", "valueSchema" : { - "type" : "object", - "properties" : { - "FOO" : { - "connect.index" : 0, - "oneOf" : [ { - "type" : "null" - }, { - "type" : "string" - } ] - } - } + "oneOf" : [ { + "type" : "null" + }, { + "type" : "string" + } ] }, "valueFormat" : "JSON", "replicas" : 1, @@ -95,17 +89,11 @@ }, "partitions" : 4, "valueSchema" : { - "type" : "object", - "properties" : { - "FOO" : { - "connect.index" : 0, - "oneOf" : [ { - "type" : "null" - }, { - "type" : "string" - } ] - } - } + "oneOf" : [ { + "type" : "null" + }, { + "type" : "string" + } ] } }, { "name" : "OUTPUT", diff --git a/ksqldb-functional-tests/src/test/resources/query-validation-tests/elements.json b/ksqldb-functional-tests/src/test/resources/query-validation-tests/elements.json index d1f90573f69c..ff0cc3fc9a74 100644 --- a/ksqldb-functional-tests/src/test/resources/query-validation-tests/elements.json +++ b/ksqldb-functional-tests/src/test/resources/query-validation-tests/elements.json @@ -1454,7 +1454,7 @@ } ], "inputs": [{"topic": "input", "key": {"id": 42}, "value": {"c1": 4}}], - "outputs": [{"topic": "OUTPUT", "key": {"id": 42}, "value": {"c1": 4}}], + "outputs": [{"topic": "OUTPUT", "key": {"id": 42}, "value": {"c1": 4, "c2": null}}], "post": { "sources": [ {"name": "OUTPUT", "type": "stream", "schema": "`ROWKEY` struct<`id` BIGINT> KEY, `c1` BIGINT"} diff --git a/ksqldb-functional-tests/src/test/resources/query-validation-tests/json_sr.json b/ksqldb-functional-tests/src/test/resources/query-validation-tests/json_sr.json index 187084c3f483..9f6393835fef 100644 --- a/ksqldb-functional-tests/src/test/resources/query-validation-tests/json_sr.json +++ b/ksqldb-functional-tests/src/test/resources/query-validation-tests/json_sr.json @@ -349,7 +349,7 @@ "name": "input_topic", "keySchema": {"oneOf":[{"type":"null"},{"type":"boolean","connect.type":"boolean"}]}, "keyFormat": "JSON_SR", - "valueSchema": {"type":"object","properties":{"COL1":{"connect.index":0,"oneOf":[{"type":"null"},{"type":"integer","connect.type":"int32"}]}}}, + "valueSchema": {"type":"object","properties":{"FOO":{"connect.index":0,"oneOf":[{"type":"null"},{"type":"integer","connect.type":"int32"}]}}}, "valueFormat": "JSON_SR" } ], @@ -401,7 +401,7 @@ "name": "input_topic", "keySchema": {"oneOf":[{"type":"null"},{"type":"integer","connect.type":"int32"}]}, "keyFormat": "JSON_SR", - "valueSchema": {"type":"object","properties":{"COL1":{"connect.index":0,"oneOf":[{"type":"null"},{"type":"integer","connect.type":"int32"}]}}}, + "valueSchema": {"type":"object","properties":{"FOO":{"connect.index":0,"oneOf":[{"type":"null"},{"type":"integer","connect.type":"int32"}]}}}, "valueFormat": "JSON_SR" } ], @@ -453,7 +453,7 @@ "name": "input_topic", "keySchema": {"oneOf":[{"type":"null"},{"type":"integer","connect.type":"int64"}]}, "keyFormat": "JSON_SR", - "valueSchema": {"type":"object","properties":{"COL1":{"connect.index":0,"oneOf":[{"type":"null"},{"type":"integer","connect.type":"int32"}]}}}, + "valueSchema": {"type":"object","properties":{"FOO":{"connect.index":0,"oneOf":[{"type":"null"},{"type":"integer","connect.type":"int32"}]}}}, "valueFormat": "JSON_SR" } ], @@ -503,7 +503,7 @@ "name": "input_topic", "keySchema": {"oneOf":[{"type":"null"},{"type":"number","connect.type":"float64"}]}, "keyFormat": "JSON_SR", - "valueSchema": {"type":"object","properties":{"COL1":{"connect.index":0,"oneOf":[{"type":"null"},{"type":"integer","connect.type":"int32"}]}}}, + "valueSchema": {"type":"object","properties":{"FOO":{"connect.index":0,"oneOf":[{"type":"null"},{"type":"integer","connect.type":"int32"}]}}}, "valueFormat": "JSON_SR" } ], @@ -553,7 +553,7 @@ "name": "input_topic", "keySchema": {"oneOf":[{"type":"null"},{"type":"string","connect.type":"string"}]}, "keyFormat": "JSON_SR", - "valueSchema": {"type":"object","properties":{"COL1":{"connect.index":0,"oneOf":[{"type":"null"},{"type":"integer","connect.type":"int32"}]}}}, + "valueSchema": {"type":"object","properties":{"FOO":{"connect.index":0,"oneOf":[{"type":"null"},{"type":"integer","connect.type":"int32"}]}}}, "valueFormat": "JSON_SR" } ], @@ -617,7 +617,7 @@ ] }, "keyFormat": "JSON_SR", - "valueSchema": {"type":"object","properties":{"COL1":{"connect.index":0,"oneOf":[{"type":"null"},{"type":"integer","connect.type":"int32"}]}}}, + "valueSchema": {"type":"object","properties":{"FOO":{"connect.index":0,"oneOf":[{"type":"null"},{"type":"integer","connect.type":"int32"}]}}}, "valueFormat": "JSON_SR" } ], @@ -669,7 +669,7 @@ "name": "input_topic", "keySchema": {"oneOf":[{"type":"null"},{"type":"array","items":{"oneOf":[{"type":"null"},{"type":"string"}]}}]}, "keyFormat": "JSON_SR", - "valueSchema": {"type":"object","properties":{"COL1":{"connect.index":0,"oneOf":[{"type":"null"},{"type":"integer","connect.type":"int32"}]}}}, + "valueSchema": {"type":"object","properties":{"FOO":{"connect.index":0,"oneOf":[{"type":"null"},{"type":"integer","connect.type":"int32"}]}}}, "valueFormat": "JSON_SR" } ], @@ -761,7 +761,7 @@ "name": "input_topic", "keySchema": {"type":"object","properties":{"F1":{"connect.index":0,"oneOf":[{"type":"null"},{"type":"string","connect.type":"string"}]}}}, "keyFormat": "JSON_SR", - "valueSchema": {"type":"object","properties":{"COL1":{"connect.index":0,"oneOf":[{"type":"null"},{"type":"integer","connect.type":"int32"}]}}}, + "valueSchema": {"type":"object","properties":{"FOO":{"connect.index":0,"oneOf":[{"type":"null"},{"type":"integer","connect.type":"int32"}]}}}, "valueFormat": "JSON_SR" } ], diff --git a/ksqldb-serde/src/main/java/io/confluent/ksql/serde/connect/ConnectDataTranslator.java b/ksqldb-serde/src/main/java/io/confluent/ksql/serde/connect/ConnectDataTranslator.java index 62b624bd6610..fc86909a5725 100644 --- a/ksqldb-serde/src/main/java/io/confluent/ksql/serde/connect/ConnectDataTranslator.java +++ b/ksqldb-serde/src/main/java/io/confluent/ksql/serde/connect/ConnectDataTranslator.java @@ -16,6 +16,7 @@ package io.confluent.ksql.serde.connect; import io.confluent.ksql.serde.SerdeUtils; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -232,6 +233,12 @@ private Object toKsqlValue( case STRING: // use String.valueOf to convert various int types and Boolean to string return String.valueOf(convertedValue); + case BYTES: + if (convertedValue instanceof byte[]) { + return ByteBuffer.wrap((byte[]) convertedValue); + } + + return convertedValue; default: return convertedValue; } diff --git a/ksqldb-serde/src/main/java/io/confluent/ksql/serde/json/JsonSchemaTranslator.java b/ksqldb-serde/src/main/java/io/confluent/ksql/serde/json/JsonSchemaTranslator.java index ea01b7472bd2..383742c8c0f9 100644 --- a/ksqldb-serde/src/main/java/io/confluent/ksql/serde/json/JsonSchemaTranslator.java +++ b/ksqldb-serde/src/main/java/io/confluent/ksql/serde/json/JsonSchemaTranslator.java @@ -15,19 +15,27 @@ package io.confluent.ksql.serde.json; +import com.google.common.collect.ImmutableMap; import io.confluent.connect.json.JsonSchemaData; +import io.confluent.connect.json.JsonSchemaDataConfig; import io.confluent.kafka.schemaregistry.ParsedSchema; import io.confluent.kafka.schemaregistry.json.JsonSchema; import io.confluent.ksql.serde.connect.ConnectSchemaTranslator; import java.util.Map; import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.json.DecimalFormat; +import org.apache.kafka.connect.json.JsonConverterConfig; /** * Translates between Connect and JSON Schema Registry schema types. */ -class JsonSchemaTranslator implements ConnectSchemaTranslator { +public class JsonSchemaTranslator implements ConnectSchemaTranslator { - private final JsonSchemaData jsonData = new JsonSchemaData(); + private final JsonSchemaData jsonData = new JsonSchemaData(new JsonSchemaDataConfig( + ImmutableMap.of( + JsonConverterConfig.DECIMAL_FORMAT_CONFIG, DecimalFormat.NUMERIC.name() + ) + )); @Override public void configure(final Map configs) { diff --git a/ksqldb-serde/src/main/java/io/confluent/ksql/serde/json/KsqlJsonDeserializer.java b/ksqldb-serde/src/main/java/io/confluent/ksql/serde/json/KsqlJsonDeserializer.java index 3bb14db99c19..ea9fdc263d06 100644 --- a/ksqldb-serde/src/main/java/io/confluent/ksql/serde/json/KsqlJsonDeserializer.java +++ b/ksqldb-serde/src/main/java/io/confluent/ksql/serde/json/KsqlJsonDeserializer.java @@ -30,8 +30,6 @@ import com.fasterxml.jackson.databind.node.TextNode; import com.fasterxml.jackson.databind.ser.std.DateSerializer; import com.google.common.collect.ImmutableMap; -import io.confluent.ksql.schema.connect.SchemaWalker; -import io.confluent.ksql.schema.connect.SchemaWalker.Visitor; import io.confluent.ksql.schema.connect.SqlSchemaFormatter; import io.confluent.ksql.serde.SerdeUtils; import io.confluent.ksql.util.DecimalUtil; @@ -98,7 +96,7 @@ public class KsqlJsonDeserializer implements Deserializer { final boolean isJsonSchema, final Class targetType ) { - this.schema = validateSchema(Objects.requireNonNull(schema, "schema")); + this.schema = Objects.requireNonNull(schema, "schema"); this.isJsonSchema = isJsonSchema; this.targetType = Objects.requireNonNull(targetType, "targetType"); @@ -357,25 +355,4 @@ public String getPath() { return path; } } - - private static Schema validateSchema(final Schema schema) { - - class SchemaValidator implements Visitor { - - @Override - public Void visitMap(final Schema mapSchema, final Void key, final Void value) { - if (mapSchema.keySchema().type() != Type.STRING) { - throw new KsqlException("JSON only supports MAP types with STRING keys"); - } - return null; - } - - public Void visitSchema(final Schema schema11) { - return null; - } - } - - SchemaWalker.visit(schema, new SchemaValidator()); - return schema; - } } \ No newline at end of file diff --git a/ksqldb-serde/src/main/java/io/confluent/ksql/serde/json/KsqlJsonSerdeFactory.java b/ksqldb-serde/src/main/java/io/confluent/ksql/serde/json/KsqlJsonSerdeFactory.java index dd9e60f34447..a50edb6c610b 100644 --- a/ksqldb-serde/src/main/java/io/confluent/ksql/serde/json/KsqlJsonSerdeFactory.java +++ b/ksqldb-serde/src/main/java/io/confluent/ksql/serde/json/KsqlJsonSerdeFactory.java @@ -21,13 +21,16 @@ import io.confluent.connect.json.JsonSchemaConverterConfig; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; +import io.confluent.ksql.schema.connect.SchemaWalker; import io.confluent.ksql.serde.SerdeFactory; import io.confluent.ksql.serde.SerdeUtils; import io.confluent.ksql.serde.connect.ConnectDataTranslator; import io.confluent.ksql.serde.connect.ConnectSRSchemaDataTranslator; +import io.confluent.ksql.serde.connect.KsqlConnectDeserializer; import io.confluent.ksql.serde.connect.KsqlConnectSerializer; import io.confluent.ksql.serde.tls.ThreadLocalSerializer; import io.confluent.ksql.util.KsqlConfig; +import io.confluent.ksql.util.KsqlException; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -45,7 +48,6 @@ @SuppressWarnings("checkstyle:ClassDataAbstractionCoupling") @Immutable class KsqlJsonSerdeFactory implements SerdeFactory { - private final boolean useSchemaRegistryFormat; private final JsonSchemaProperties properties; @@ -70,6 +72,8 @@ public Serde createSerde( final Class targetType, final boolean isKey ) { + validateSchema(schema); + final Optional physicalSchema; if (useSchemaRegistryFormat) { physicalSchema = properties.getSchemaId().isPresent() ? Optional.of( @@ -79,16 +83,28 @@ public Serde createSerde( physicalSchema = Optional.empty(); } + final Converter converter = useSchemaRegistryFormat + ? getSchemaRegistryConverter(srFactory.get(), ksqlConfig, properties.getSchemaId(), isKey) + : getConverter(); + + // The translators are used in the serializer & deserializzer only for JSON_SR formats + final ConnectDataTranslator dataTranslator = physicalSchema.isPresent() + ? new ConnectSRSchemaDataTranslator(physicalSchema.get()) + : new ConnectDataTranslator(schema); + final Supplier> serializer = () -> createSerializer( - schema, - ksqlConfig, - srFactory, targetType, - physicalSchema, - isKey + dataTranslator, + converter ); - final Deserializer deserializer = createDeserializer(schema, targetType); + final Deserializer deserializer = createDeserializer( + ksqlConfig, + schema, + targetType, + dataTranslator, + converter + ); // Sanity check: serializer.get(); @@ -100,21 +116,10 @@ public Serde createSerde( } private Serializer createSerializer( - final Schema schema, - final KsqlConfig ksqlConfig, - final Supplier srFactory, final Class targetType, - final Optional physicalSchema, - final boolean isKey + final ConnectDataTranslator dataTranslator, + final Converter converter ) { - final Converter converter = useSchemaRegistryFormat - ? getSchemaConverter(srFactory.get(), ksqlConfig, properties.getSchemaId(), isKey) - : getConverter(); - - final ConnectDataTranslator dataTranslator = - physicalSchema.isPresent() ? new ConnectSRSchemaDataTranslator(physicalSchema.get()) - : new ConnectDataTranslator(schema); - return new KsqlConnectSerializer<>( dataTranslator.getSchema(), dataTranslator, @@ -124,14 +129,29 @@ private Serializer createSerializer( } private Deserializer createDeserializer( + final KsqlConfig ksqlConfig, final Schema schema, - final Class targetType + final Class targetType, + final ConnectDataTranslator dataTranslator, + final Converter converter ) { - return new KsqlJsonDeserializer<>( - schema, - useSchemaRegistryFormat, - targetType - ); + if (useSchemaRegistryFormat + && ksqlConfig.getBoolean(KsqlConfig.KSQL_JSON_SR_CONVERTER_DESERIALIZER_ENABLED)) { + return new KsqlConnectDeserializer<>( + converter, + dataTranslator, + targetType + ); + } else { + return new KsqlJsonDeserializer<>( + schema, + // This parameter should be removed once KSQL_JSON_SR_CONVERTER_DESERIALIZER_ENABLED + // is completely deprecated and removed from the code. The KsqlJsonDeserializer class + // should only be used for JSON (no JSON_SR) formats after that. + useSchemaRegistryFormat, + targetType + ); + } } private static Converter getConverter() { @@ -143,7 +163,7 @@ private static Converter getConverter() { return converter; } - private static Converter getSchemaConverter( + private static Converter getSchemaRegistryConverter( final SchemaRegistryClient schemaRegistryClient, final KsqlConfig ksqlConfig, final Optional schemaId, @@ -168,4 +188,25 @@ private static Converter getSchemaConverter( return converter; } + + private static Schema validateSchema(final Schema schema) { + + class SchemaValidator implements SchemaWalker.Visitor { + + @Override + public Void visitMap(final Schema mapSchema, final Void key, final Void value) { + if (mapSchema.keySchema().type() != Schema.Type.STRING) { + throw new KsqlException("JSON only supports MAP types with STRING keys"); + } + return null; + } + + public Void visitSchema(final Schema schema11) { + return null; + } + } + + SchemaWalker.visit(schema, new SchemaValidator()); + return schema; + } } diff --git a/ksqldb-serde/src/test/java/io/confluent/ksql/serde/avro/AvroDataTranslatorTest.java b/ksqldb-serde/src/test/java/io/confluent/ksql/serde/avro/AvroDataTranslatorTest.java index 34cac8383e13..8629085917d1 100644 --- a/ksqldb-serde/src/test/java/io/confluent/ksql/serde/avro/AvroDataTranslatorTest.java +++ b/ksqldb-serde/src/test/java/io/confluent/ksql/serde/avro/AvroDataTranslatorTest.java @@ -28,6 +28,8 @@ import io.confluent.ksql.util.DecimalUtil; import io.confluent.ksql.util.KsqlException; import java.math.BigDecimal; +import java.nio.ByteBuffer; + import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; @@ -357,6 +359,6 @@ public void shouldReturnBytes() { // Then: assertThat( translator.toKsqlRow(Schema.BYTES_SCHEMA, new byte[] {123}), - is(new byte[] {123})); + is(ByteBuffer.wrap(new byte[] {123}))); } } diff --git a/ksqldb-serde/src/test/java/io/confluent/ksql/serde/json/KsqlJsonDeserializerTest.java b/ksqldb-serde/src/test/java/io/confluent/ksql/serde/json/KsqlJsonDeserializerTest.java index 60e1ccc98acd..3a897d531eb7 100644 --- a/ksqldb-serde/src/test/java/io/confluent/ksql/serde/json/KsqlJsonDeserializerTest.java +++ b/ksqldb-serde/src/test/java/io/confluent/ksql/serde/json/KsqlJsonDeserializerTest.java @@ -43,7 +43,6 @@ import io.confluent.ksql.serde.SerdeUtils; import io.confluent.ksql.serde.connect.ConnectKsqlSchemaTranslator; import io.confluent.ksql.util.DecimalUtil; -import io.confluent.ksql.util.KsqlException; import java.io.IOException; import java.math.BigDecimal; import java.nio.ByteBuffer; @@ -57,7 +56,6 @@ import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.kafka.common.errors.SerializationException; -import org.apache.kafka.connect.data.ConnectSchema; import org.apache.kafka.connect.data.Date; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; @@ -146,7 +144,7 @@ public static Collection data() { private KsqlJsonDeserializer deserializer; @Before - public void before() { + public void before() throws Exception { expectedOrder = new Struct(ORDER_SCHEMA) .put(ORDERTIME, 1511897796092L) .put(ORDERID, 1L) @@ -226,7 +224,7 @@ public void shouldThrowIfNotAnObject() { public void shouldThrowIfFieldCanNotBeCoerced() { // Given: final Map value = new HashMap<>(AN_ORDER); - value.put("ordertime", true); + value.put(ORDERTIME, true); final byte[] bytes = serializeJson(value); @@ -911,53 +909,6 @@ public void shouldThrowIfCanNotCoerceMapValue() { "Can't convert type. sourceType: BooleanNode, requiredType: INTEGER")))); } - @Test - public void shouldThrowOnMapSchemaWithNonStringKeys() { - // Given: - final ConnectSchema schema = (ConnectSchema) SchemaBuilder - .struct() - .field("f0", SchemaBuilder - .map(Schema.OPTIONAL_INT32_SCHEMA, Schema.INT32_SCHEMA) - .optional() - .build()) - .build(); - - // When: - final Exception e = assertThrows( - KsqlException.class, - () -> new KsqlJsonDeserializer<>(schema, false, Struct.class) - ); - - // Then: - assertThat(e.getMessage(), containsString( - "JSON only supports MAP types with STRING keys")); - } - - @Test - public void shouldThrowOnNestedMapSchemaWithNonStringKeys() { - // Given: - final ConnectSchema schema = (ConnectSchema) SchemaBuilder - .struct() - .field("f0", SchemaBuilder - .struct() - .field("f1", SchemaBuilder - .map(Schema.OPTIONAL_INT32_SCHEMA, Schema.INT32_SCHEMA) - .optional() - .build()) - .build()) - .build(); - - // When: - final Exception e = assertThrows( - KsqlException.class, - () -> new KsqlJsonDeserializer<>(schema, false, Struct.class) - ); - - // Then: - assertThat(e.getMessage(), containsString( - "JSON only supports MAP types with STRING keys")); - } - @Test public void shouldIncludeTopicNameInException() { // Given: @@ -1103,7 +1054,11 @@ private KsqlJsonDeserializer givenDeserializerForSchema( final Schema schema, final Class type ) { - return new KsqlJsonDeserializer<>((ConnectSchema) schema, useSchemas, type); + return new KsqlJsonDeserializer<>( + schema, + useSchemas, + type + ); } private byte[] serializeJson(final Object expected) { diff --git a/ksqldb-serde/src/test/java/io/confluent/ksql/serde/json/KsqlJsonSchemaDeserializerTest.java b/ksqldb-serde/src/test/java/io/confluent/ksql/serde/json/KsqlJsonSchemaDeserializerTest.java new file mode 100644 index 000000000000..7947742925d8 --- /dev/null +++ b/ksqldb-serde/src/test/java/io/confluent/ksql/serde/json/KsqlJsonSchemaDeserializerTest.java @@ -0,0 +1,122 @@ +package io.confluent.ksql.serde.json; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.confluent.kafka.schemaregistry.ParsedSchema; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.ksql.serde.connect.ConnectKsqlSchemaTranslator; +import io.confluent.ksql.util.KsqlConfig; +import java.nio.ByteBuffer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class KsqlJsonSchemaDeserializerTest { + private static final String SOME_TOPIC = "bob"; + + private static final String ORDERTIME = "ORDERTIME"; + private static final String ORDERID = "@ORDERID"; + private static final String ITEMID = "ITEMID"; + private static final String ORDERUNITS = "ORDERUNITS"; + private static final String ARRAYCOL = "ARRAYCOL"; + private static final String MAPCOL = "MAPCOL"; + private static final String CASE_SENSITIVE_FIELD = "caseField"; + private static final String TIMEFIELD = "TIMEFIELD"; + private static final String DATEFIELD = "DATEFIELD"; + private static final String TIMESTAMPFIELD = "TIMESTAMPFIELD"; + private static final String BYTESFIELD = "BYTESFIELD"; + + private static final Schema ORDER_SCHEMA = SchemaBuilder.struct() + .field(ORDERTIME, Schema.OPTIONAL_INT64_SCHEMA) + .field(ORDERID, Schema.OPTIONAL_INT64_SCHEMA) + .field(ITEMID, Schema.OPTIONAL_STRING_SCHEMA) + .field(ORDERUNITS, Schema.OPTIONAL_FLOAT64_SCHEMA) + .field(CASE_SENSITIVE_FIELD, Schema.OPTIONAL_INT64_SCHEMA) + .field(ARRAYCOL, SchemaBuilder + .array(Schema.OPTIONAL_FLOAT64_SCHEMA) + .optional() + .build()) + .field(MAPCOL, SchemaBuilder + .map(Schema.OPTIONAL_STRING_SCHEMA, Schema.OPTIONAL_FLOAT64_SCHEMA) + .optional() + .build()) + .field(TIMEFIELD, ConnectKsqlSchemaTranslator.OPTIONAL_TIME_SCHEMA) + .field(DATEFIELD, ConnectKsqlSchemaTranslator.OPTIONAL_DATE_SCHEMA) + .field(TIMESTAMPFIELD, ConnectKsqlSchemaTranslator.OPTIONAL_TIMESTAMP_SCHEMA) + .field(BYTESFIELD, Schema.OPTIONAL_BYTES_SCHEMA) + .version(0) + .build(); + + private static final Struct AN_ORDER = new Struct(ORDER_SCHEMA) + .put(ORDERTIME, 1511897796092L) + .put(ORDERID, 1L) + .put(ITEMID, "Item_1") + .put(ORDERUNITS, 10.0) + .put(ARRAYCOL, ImmutableList.of(10.0, 20.0)) + .put(MAPCOL, ImmutableMap.of("key1", 10.0)) + .put(CASE_SENSITIVE_FIELD, 1L) + .put(TIMEFIELD, new java.sql.Time(1000)) + .put(DATEFIELD, new java.sql.Date(864000000L)) + .put(TIMESTAMPFIELD, new java.sql.Timestamp(1000)) + .put(BYTESFIELD, ByteBuffer.wrap(new byte[] {123})); + + private Serializer serializer; + private Deserializer deserializer; + private SchemaRegistryClient schemaRegistryClient; + private ParsedSchema schema; + + @Before + public void before() throws Exception { + schema = (new JsonSchemaTranslator()).fromConnectSchema(ORDER_SCHEMA.schema()); + + schemaRegistryClient = mock(SchemaRegistryClient.class); + when(schemaRegistryClient.getSchemaBySubjectAndId(anyString(), anyInt())).thenReturn(schema); + + final KsqlJsonSerdeFactory jsonSerdeFactory = + new KsqlJsonSerdeFactory(new JsonSchemaProperties(ImmutableMap.of())); + + final Serde serde = jsonSerdeFactory.createSerde( + ORDER_SCHEMA, + new KsqlConfig(ImmutableMap.of()), + () -> schemaRegistryClient, + Struct.class, + false + ); + + serializer = serde.serializer(); + deserializer = serde.deserializer(); + } + + @Test + public void shouldDeserializeJsonObjectCorrectly() { + // Given: + final byte[] bytes = serializer.serialize(SOME_TOPIC, AN_ORDER); + + // When: + final Struct result = deserializer.deserialize(SOME_TOPIC, bytes); + + // Then: + assertThat(result, is(AN_ORDER)); + } + + @Test + public void shouldDeserializeNullAsNull() { + assertThat(deserializer.deserialize(SOME_TOPIC, null), is(nullValue())); + } +} diff --git a/ksqldb-serde/src/test/java/io/confluent/ksql/serde/json/KsqlJsonSerdeFactoryTest.java b/ksqldb-serde/src/test/java/io/confluent/ksql/serde/json/KsqlJsonSerdeFactoryTest.java index 52a952b57fa3..b246fcc61d8a 100644 --- a/ksqldb-serde/src/test/java/io/confluent/ksql/serde/json/KsqlJsonSerdeFactoryTest.java +++ b/ksqldb-serde/src/test/java/io/confluent/ksql/serde/json/KsqlJsonSerdeFactoryTest.java @@ -16,14 +16,21 @@ package io.confluent.ksql.serde.json; import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertThrows; +import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableMap; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.ksql.serde.connect.KsqlConnectDeserializer; +import io.confluent.ksql.serde.connect.KsqlConnectSerializer; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; import java.util.function.Supplier; + +import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.connect.data.ConnectSchema; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; @@ -39,15 +46,48 @@ public class KsqlJsonSerdeFactoryTest { @Mock private KsqlConfig config; @Mock - private Supplier srFactory; - @Mock private KsqlJsonSerdeFactory jsonFactory; + @Mock + private SchemaRegistryClient schemaRegistryClient; + + private Supplier srFactory; @Before public void setUp() { + srFactory = () -> schemaRegistryClient; jsonFactory = new KsqlJsonSerdeFactory(new JsonSchemaProperties(ImmutableMap.of())); } + @Test + public void shouldUseNewJsonSchemaDeserializerOnJsonSrWhenJsonSchemaConverterIsEnabled() { + // Given + final ConnectSchema connectSchema = (ConnectSchema) SchemaBuilder.string().build(); + when(config.getBoolean(KsqlConfig.KSQL_JSON_SR_CONVERTER_DESERIALIZER_ENABLED)) + .thenReturn(true); + + // When + final Serde serde = + jsonFactory.createSerde(connectSchema, config, srFactory, String.class, false); + + // Then + assertThat(serde.deserializer(), is(instanceOf(KsqlConnectDeserializer.class))); + } + + @Test + public void shouldUseOldJsonDeserializerOnJsonSrWhenJsonSchemaConverterIsDisabled() { + // Given + final ConnectSchema connectSchema = (ConnectSchema) SchemaBuilder.string().build(); + when(config.getBoolean(KsqlConfig.KSQL_JSON_SR_CONVERTER_DESERIALIZER_ENABLED)) + .thenReturn(false); + + // When + final Serde serde = + jsonFactory.createSerde(connectSchema, config, srFactory, String.class, false); + + // Then + assertThat(serde.deserializer(), is(instanceOf(KsqlJsonDeserializer.class))); + } + @Test public void shouldThrowOnMapWithNoneStringKeys() { // Given: