From 1c3c8b2fe25dc8ad825a18f1907d3685c6fb8fa3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20Pe=C3=B1a?= Date: Mon, 11 Apr 2022 18:27:25 -0500 Subject: [PATCH] Return to AvroSRSchemaDataTranslator and use ConnectSRSchemaDataTranslator... for Protobuf/Json --- .../schema/registry/SchemaRegistryUtil.java | 35 ++-- .../query-validation-tests/elements.json | 2 +- .../execution/InsertValuesExecutor.java | 2 +- .../avro/AvroSRSchemaDataTranslator.java | 69 ++++++++ .../ksql/serde/avro/KsqlAvroSerdeFactory.java | 3 +- .../ConnectSRSchemaDataTranslator.java | 93 ++--------- .../ksql/serde/json/KsqlJsonSerdeFactory.java | 3 +- .../protobuf/ProtobufDataTranslator.java | 62 ++----- .../ksql/serde/protobuf/ProtobufSchemas.java | 20 +-- .../serde/protobuf/ProtobufSerdeFactory.java | 2 +- .../avro/AvroSRSchemaDataTranslatorTest.java | 152 ++++++++++++++++++ .../serde/avro/KsqlAvroSerializerTest.java | 4 +- .../ConnectSRSchemaDataTranslatorTest.java | 107 +----------- .../serde/protobuf/ProtobufSchemasTest.java | 30 ---- 14 files changed, 275 insertions(+), 309 deletions(-) create mode 100644 ksqldb-serde/src/main/java/io/confluent/ksql/serde/avro/AvroSRSchemaDataTranslator.java create mode 100644 ksqldb-serde/src/test/java/io/confluent/ksql/serde/avro/AvroSRSchemaDataTranslatorTest.java diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/schema/registry/SchemaRegistryUtil.java b/ksqldb-engine/src/main/java/io/confluent/ksql/schema/registry/SchemaRegistryUtil.java index e02d80f95e54..4c1f69a07f1d 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/schema/registry/SchemaRegistryUtil.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/schema/registry/SchemaRegistryUtil.java @@ -123,22 +123,26 @@ public static Optional getParsedSchema( try { return Optional.of(srClient.getSchemaById(schemaId)); } catch (final Exception e) { - if (isAuthErrorCode(e)) { - final AclOperation deniedOperation = SchemaRegistryUtil.getDeniedOperation(e.getMessage()); - - if (deniedOperation != AclOperation.UNKNOWN) { - throw new KsqlSchemaAuthorizationException( - deniedOperation, - subject - ); - } - } + throwOnAuthError(e, subject); throw new KsqlException( "Could not get schema for subject " + subject + " and id " + schemaId, e); } } + private static void throwOnAuthError(final Exception e, final String subject) { + if (isAuthErrorCode(e)) { + final AclOperation deniedOperation = SchemaRegistryUtil.getDeniedOperation(e.getMessage()); + + if (deniedOperation != AclOperation.UNKNOWN) { + throw new KsqlSchemaAuthorizationException( + deniedOperation, + subject + ); + } + } + } + public static Optional getLatestSchema( final SchemaRegistryClient srClient, final String topic, @@ -160,16 +164,7 @@ public static Optional getLatestSchema( return Optional.empty(); } - if (isAuthErrorCode(e)) { - final AclOperation deniedOperation = SchemaRegistryUtil.getDeniedOperation(e.getMessage()); - - if (deniedOperation != AclOperation.UNKNOWN) { - throw new KsqlSchemaAuthorizationException( - deniedOperation, - subject - ); - } - } + throwOnAuthError(e, subject); throw new KsqlException("Could not get latest schema for subject " + subject, e); } diff --git a/ksqldb-functional-tests/src/test/resources/query-validation-tests/elements.json b/ksqldb-functional-tests/src/test/resources/query-validation-tests/elements.json index 57e7e661c703..18c5aac3668f 100644 --- a/ksqldb-functional-tests/src/test/resources/query-validation-tests/elements.json +++ b/ksqldb-functional-tests/src/test/resources/query-validation-tests/elements.json @@ -1258,7 +1258,7 @@ "outputs": [{"topic": "OUTPUT", "key": 42, "value": {"c1": 4}}], "expectedException": { "type": "org.apache.kafka.streams.errors.StreamsException", - "message": "Error serializing message to topic: OUTPUT. Missing default value for required field: [c2]." + "message": "Error serializing message to topic: OUTPUT. Missing default value for required Avro field: [c2]." } }, { diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/InsertValuesExecutor.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/InsertValuesExecutor.java index d8652d06e0de..b4e0819223e4 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/InsertValuesExecutor.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/InsertValuesExecutor.java @@ -371,7 +371,7 @@ private static void ensureKeySchemasMatch( final FormatInfo formatInfo = addSerializerMissingFormatFields( schemaRegistryClient, - dataSource.getKsqlTopic().getValueFormat().getFormatInfo(), + dataSource.getKsqlTopic().getKeyFormat().getFormatInfo(), dataSource.getKafkaTopicName(), true ); diff --git a/ksqldb-serde/src/main/java/io/confluent/ksql/serde/avro/AvroSRSchemaDataTranslator.java b/ksqldb-serde/src/main/java/io/confluent/ksql/serde/avro/AvroSRSchemaDataTranslator.java new file mode 100644 index 000000000000..05b1824bba3d --- /dev/null +++ b/ksqldb-serde/src/main/java/io/confluent/ksql/serde/avro/AvroSRSchemaDataTranslator.java @@ -0,0 +1,69 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.serde.avro; + +import io.confluent.ksql.serde.connect.ConnectSRSchemaDataTranslator; +import io.confluent.ksql.util.KsqlException; +import java.util.Optional; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; + +/** + * Translates KSQL data and schemas to Avro equivalents. + * + *

Responsible for converting the KSQL schema to a version ready for connect to convert to an + * avro schema. + * + *

This includes ensuring field names are valid Avro field names and that nested types do not + * have name clashes. + */ +public class AvroSRSchemaDataTranslator extends ConnectSRSchemaDataTranslator { + + AvroSRSchemaDataTranslator(final Schema schema) { + super(schema); + } + + @Override + public Object toConnectRow(final Object ksqlData) { + if (!(ksqlData instanceof Struct)) { + return ksqlData; + } + final Schema schema = getSchema(); + final Struct struct = new Struct(schema); + final Struct originalData = (Struct) ksqlData; + final Schema originalSchema = originalData.schema(); + + validate(originalSchema); + + for (final Field field : schema.fields()) { + final Optional originalField = originalSchema.fields().stream() + .filter(f -> field.name().equals(f.name())).findFirst(); + if (originalField.isPresent()) { + struct.put(field, originalData.get(originalField.get())); + } else { + if (field.schema().defaultValue() != null || field.schema().isOptional()) { + struct.put(field, field.schema().defaultValue()); + } else { + throw new KsqlException("Missing default value for required Avro field: [" + field.name() + + "]. This field appears in Avro schema in Schema Registry"); + } + } + } + + return struct; + } +} diff --git a/ksqldb-serde/src/main/java/io/confluent/ksql/serde/avro/KsqlAvroSerdeFactory.java b/ksqldb-serde/src/main/java/io/confluent/ksql/serde/avro/KsqlAvroSerdeFactory.java index f116ff6eef82..1360fac191fd 100644 --- a/ksqldb-serde/src/main/java/io/confluent/ksql/serde/avro/KsqlAvroSerdeFactory.java +++ b/ksqldb-serde/src/main/java/io/confluent/ksql/serde/avro/KsqlAvroSerdeFactory.java @@ -23,7 +23,6 @@ import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import io.confluent.ksql.serde.SerdeUtils; import io.confluent.ksql.serde.connect.ConnectDataTranslator; -import io.confluent.ksql.serde.connect.ConnectSRSchemaDataTranslator; import io.confluent.ksql.serde.connect.DataTranslator; import io.confluent.ksql.serde.connect.KsqlConnectDeserializer; import io.confluent.ksql.serde.connect.KsqlConnectSerializer; @@ -151,7 +150,7 @@ private DataTranslator createAvroTranslator(final Schema schema, // deserialization, if physical schema exists, we use original schema to translate to ksql data. return physicalSchema.map( value -> isDeserializer ? new ConnectDataTranslator(schema) - : new ConnectSRSchemaDataTranslator(value, AvroFormat.NAME)) + : new AvroSRSchemaDataTranslator(value)) .orElseGet(() -> new AvroDataTranslator(schema, fullSchemaName)); } diff --git a/ksqldb-serde/src/main/java/io/confluent/ksql/serde/connect/ConnectSRSchemaDataTranslator.java b/ksqldb-serde/src/main/java/io/confluent/ksql/serde/connect/ConnectSRSchemaDataTranslator.java index 03368e3785f8..25f1ca44f350 100644 --- a/ksqldb-serde/src/main/java/io/confluent/ksql/serde/connect/ConnectSRSchemaDataTranslator.java +++ b/ksqldb-serde/src/main/java/io/confluent/ksql/serde/connect/ConnectSRSchemaDataTranslator.java @@ -15,10 +15,7 @@ package io.confluent.ksql.serde.connect; -import io.confluent.ksql.serde.avro.AvroFormat; -import io.confluent.ksql.serde.protobuf.ProtobufFormat; import io.confluent.ksql.util.KsqlException; -import java.util.Optional; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Schema.Type; @@ -33,19 +30,9 @@ * fail. */ public class ConnectSRSchemaDataTranslator extends ConnectDataTranslator { - private RowTranslator rowTranslator; - public ConnectSRSchemaDataTranslator(final Schema schema, final String formatName) { + public ConnectSRSchemaDataTranslator(final Schema schema) { super(schema); - - switch (formatName.trim().toUpperCase()) { - case AvroFormat.NAME: - case ProtobufFormat.NAME: - this.rowTranslator = new AvroAndProtobufRowTranslator(); - break; - default: - this.rowTranslator = new DefaultRowTranslator(); - } } protected void validate(final Schema originalSchema) { @@ -66,77 +53,25 @@ protected void validate(final Schema originalSchema) { @Override public Object toConnectRow(final Object ksqlData) { - return rowTranslator.translate(ksqlData); - } - - private interface RowTranslator { - Object translate(Object ksqlData); - } - - /** - * Reconstruct ksqlData struct with given schema and try to put original data in it. - * Schema may have more fields than ksqlData, don't put those field by default. If needed by - * some format like Avro, use the AvroAndProtobufRowTranslator - */ - private class DefaultRowTranslator implements RowTranslator { - @Override - public Object translate(final Object ksqlData) { - if (ksqlData instanceof Struct) { - validate(((Struct) ksqlData).schema()); - final Schema schema = getSchema(); - final Struct struct = new Struct(schema); - final Struct source = (Struct) ksqlData; - - for (final Field sourceField : source.schema().fields()) { - final Object value = source.get(sourceField); - struct.put(sourceField.name(), value); - } - - return struct; - } - - return ksqlData; - } - } - - /** - * Translates KSQL data and schemas to Avro and Protobuf equivalents. - * - *

Responsible for converting the KSQL schema to a version ready for connect to convert to an - * avro and protobuf schema. - * - *

This includes ensuring field names are valid Avro and Protobuf field names and that nested - * types do not have name clashes. - */ - private class AvroAndProtobufRowTranslator implements RowTranslator { - @Override - public Object translate(final Object ksqlData) { - if (!(ksqlData instanceof Struct)) { - return ksqlData; - } + /* + * Reconstruct ksqlData struct with given schema and try to put original data in it. + * Schema may have more fields than ksqlData, don't put those field by default. If needed by + * some format like Avro, create new subclass to handle + */ + if (ksqlData instanceof Struct) { + validate(((Struct) ksqlData).schema()); final Schema schema = getSchema(); final Struct struct = new Struct(schema); - final Struct originalData = (Struct) ksqlData; - final Schema originalSchema = originalData.schema(); + final Struct source = (Struct) ksqlData; - validate(originalSchema); - - for (final Field field : schema.fields()) { - final Optional originalField = originalSchema.fields().stream() - .filter(f -> field.name().equals(f.name())).findFirst(); - if (originalField.isPresent()) { - struct.put(field, originalData.get(originalField.get())); - } else { - if (field.schema().defaultValue() != null || field.schema().isOptional()) { - struct.put(field, field.schema().defaultValue()); - } else { - throw new KsqlException("Missing default value for required field: [" + field.name() - + "]. This field appears in the schema in Schema Registry"); - } - } + for (final Field sourceField : source.schema().fields()) { + final Object value = source.get(sourceField); + struct.put(sourceField.name(), value); } return struct; } + + return ksqlData; } } diff --git a/ksqldb-serde/src/main/java/io/confluent/ksql/serde/json/KsqlJsonSerdeFactory.java b/ksqldb-serde/src/main/java/io/confluent/ksql/serde/json/KsqlJsonSerdeFactory.java index ad29bc03fc50..a1b2ccd928bb 100644 --- a/ksqldb-serde/src/main/java/io/confluent/ksql/serde/json/KsqlJsonSerdeFactory.java +++ b/ksqldb-serde/src/main/java/io/confluent/ksql/serde/json/KsqlJsonSerdeFactory.java @@ -111,8 +111,7 @@ private Serializer createSerializer( : getConverter(); final ConnectDataTranslator dataTranslator = - physicalSchema.isPresent() - ? new ConnectSRSchemaDataTranslator(physicalSchema.get(), JsonSchemaFormat.NAME) + physicalSchema.isPresent() ? new ConnectSRSchemaDataTranslator(physicalSchema.get()) : new ConnectDataTranslator(schema); return new KsqlConnectSerializer<>( diff --git a/ksqldb-serde/src/main/java/io/confluent/ksql/serde/protobuf/ProtobufDataTranslator.java b/ksqldb-serde/src/main/java/io/confluent/ksql/serde/protobuf/ProtobufDataTranslator.java index a1c297d37039..e910714a5082 100644 --- a/ksqldb-serde/src/main/java/io/confluent/ksql/serde/protobuf/ProtobufDataTranslator.java +++ b/ksqldb-serde/src/main/java/io/confluent/ksql/serde/protobuf/ProtobufDataTranslator.java @@ -18,13 +18,7 @@ import io.confluent.ksql.serde.connect.ConnectDataTranslator; import io.confluent.ksql.serde.connect.DataTranslator; -import io.confluent.ksql.util.DecimalUtil; -import java.math.BigDecimal; -import java.util.ArrayList; -import java.util.HashMap; import java.util.Iterator; -import java.util.List; -import java.util.Map; import java.util.Objects; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; @@ -52,64 +46,32 @@ public Object toKsqlRow(final Schema connectSchema, final Object connectObject) return null; } - return replaceSchema(ksqlSchema, protoCompatibleRow); + return objectWithSchema(ksqlSchema, protoCompatibleRow); } @Override public Object toConnectRow(final Object ksqlData) { - final Object compatible = replaceSchema(protoCompatibleSchema, ksqlData); + final Object compatible = objectWithSchema(protoCompatibleSchema, ksqlData); return innerTranslator.toConnectRow(compatible); } - private static Struct convertStruct( - final Struct source, - final Schema targetSchema - ) { - final Struct struct = new Struct(targetSchema); + private static Object objectWithSchema(final Schema schema, final Object object) { + if (object == null || schema.type() != Schema.Type.STRUCT) { + return object; + } + + final Struct source = (Struct)object; + final Struct struct = new Struct(schema); final Iterator sourceIt = source.schema().fields().iterator(); - for (final Field targetField : targetSchema.fields()) { + for (final Field targetField : schema.fields()) { final Field sourceField = sourceIt.next(); final Object value = source.get(sourceField); - final Object adjusted = replaceSchema(targetField.schema(), value); - struct.put(targetField, adjusted); - } - - return struct; - } - @SuppressWarnings("unchecked") - private static Object replaceSchema(final Schema schema, final Object object) { - if (object == null) { - return null; + struct.put(targetField, value); } - switch (schema.type()) { - case ARRAY: - final List ksqlArray = new ArrayList<>(((List) object).size()); - ((List) object).forEach( - e -> ksqlArray.add(replaceSchema(schema.valueSchema(), e))); - return ksqlArray; - - case MAP: - final Map ksqlMap = new HashMap<>(); - ((Map) object).forEach( - (key, value) -> ksqlMap.put( - replaceSchema(schema.keySchema(), key), - replaceSchema(schema.valueSchema(), value) - )); - return ksqlMap; - case STRUCT: - return convertStruct((Struct) object, schema); - case BYTES: - if (DecimalUtil.isDecimal(schema)) { - return DecimalUtil.ensureFit((BigDecimal) object, schema); - } else { - return object; - } - default: - return object; - } + return struct; } } diff --git a/ksqldb-serde/src/main/java/io/confluent/ksql/serde/protobuf/ProtobufSchemas.java b/ksqldb-serde/src/main/java/io/confluent/ksql/serde/protobuf/ProtobufSchemas.java index abfbdfb663c1..ab7b0d5002be 100644 --- a/ksqldb-serde/src/main/java/io/confluent/ksql/serde/protobuf/ProtobufSchemas.java +++ b/ksqldb-serde/src/main/java/io/confluent/ksql/serde/protobuf/ProtobufSchemas.java @@ -24,27 +24,11 @@ private ProtobufSchemas() { } static Schema schemaWithName(final Schema schema, final String schemaName) { - if (schemaName == null) { + if (schemaName == null || schema.type() != Schema.Type.STRUCT) { return schema; } - final SchemaBuilder builder; - switch (schema.type()) { - case STRUCT: - builder = buildSchemaStruct(schema); - break; - case ARRAY: - builder = SchemaBuilder.array(schema.valueSchema()); - break; - case MAP: - builder = SchemaBuilder.map( - schema.keySchema(), - schema.valueSchema() - ); - break; - default: - builder = new SchemaBuilder(schema.type()); - } + final SchemaBuilder builder = buildSchemaStruct(schema); if (schema.parameters() != null) { builder.parameters(schema.parameters()); diff --git a/ksqldb-serde/src/main/java/io/confluent/ksql/serde/protobuf/ProtobufSerdeFactory.java b/ksqldb-serde/src/main/java/io/confluent/ksql/serde/protobuf/ProtobufSerdeFactory.java index 43337849fb02..65be4757b269 100644 --- a/ksqldb-serde/src/main/java/io/confluent/ksql/serde/protobuf/ProtobufSerdeFactory.java +++ b/ksqldb-serde/src/main/java/io/confluent/ksql/serde/protobuf/ProtobufSerdeFactory.java @@ -149,7 +149,7 @@ private DataTranslator getDataTranslator(final Schema schema, // deserialization, if physical schema exists, we use original schema to translate to ksql data. return physicalSchema.map( value -> isDeserializer ? new ConnectDataTranslator(schema) - : new ConnectSRSchemaDataTranslator(value, ProtobufFormat.NAME)) + : new ConnectSRSchemaDataTranslator(value)) .orElseGet(() -> { if (fullSchemaName.isPresent()) { return new ProtobufDataTranslator(schema, fullSchemaName.get()); diff --git a/ksqldb-serde/src/test/java/io/confluent/ksql/serde/avro/AvroSRSchemaDataTranslatorTest.java b/ksqldb-serde/src/test/java/io/confluent/ksql/serde/avro/AvroSRSchemaDataTranslatorTest.java new file mode 100644 index 000000000000..dd02847923e8 --- /dev/null +++ b/ksqldb-serde/src/test/java/io/confluent/ksql/serde/avro/AvroSRSchemaDataTranslatorTest.java @@ -0,0 +1,152 @@ +package io.confluent.ksql.serde.avro; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.sameInstance; +import static org.junit.Assert.assertThrows; + +import io.confluent.ksql.util.KsqlException; +import java.util.Collections; +import java.util.List; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.errors.DataException; +import org.junit.Test; + +public class AvroSRSchemaDataTranslatorTest { + + private static final Schema ORIGINAL_SCHEMA = SchemaBuilder.struct() + .field("f1", SchemaBuilder.OPTIONAL_STRING_SCHEMA) + .field("f2", SchemaBuilder.OPTIONAL_INT32_SCHEMA) + .build(); + + @Test + public void shouldTransformStruct() { + // Given: + final Schema schema = SchemaBuilder.struct() + .field("f1", SchemaBuilder.OPTIONAL_STRING_SCHEMA) + .field("f2", SchemaBuilder.OPTIONAL_INT32_SCHEMA) + .field("f3", SchemaBuilder.OPTIONAL_INT64_SCHEMA) + .build(); + final Struct struct = new Struct(ORIGINAL_SCHEMA) + .put("f1", "abc") + .put("f2", 12); + + // When: + final Object object = new AvroSRSchemaDataTranslator(schema).toConnectRow(struct); + + // Then: + assertThat(object, instanceOf(Struct.class)); + assertThat(((Struct) object).schema(), sameInstance(schema)); + assertThat(((Struct) object).get("f3"), is(nullValue())); + } + + @Test + public void shouldTransformStructWithDefaultValue() { + // Given: + final Schema schema = SchemaBuilder.struct() + .field("f1", SchemaBuilder.OPTIONAL_STRING_SCHEMA) + .field("f2", SchemaBuilder.OPTIONAL_INT32_SCHEMA) + .field("f3", SchemaBuilder.int64().defaultValue(123L)) + .build(); + final Struct struct = new Struct(ORIGINAL_SCHEMA) + .put("f1", "abc") + .put("f2", 12); + + // When: + final Object object = new AvroSRSchemaDataTranslator(schema).toConnectRow(struct); + + // Then: + assertThat(object, instanceOf(Struct.class)); + assertThat(((Struct) object).schema(), sameInstance(schema)); + assertThat(((Struct) object).get("f3"), is(123L)); + } + + @Test + public void shouldNotTransformOtherType() { + // Given: + final Schema schema = SchemaBuilder.struct() + .field("f1", SchemaBuilder.OPTIONAL_STRING_SCHEMA) + .field("f2", SchemaBuilder.OPTIONAL_INT32_SCHEMA) + .field("f3", SchemaBuilder.OPTIONAL_INT64_SCHEMA) + .build(); + final List list = Collections.emptyList(); + + // When: + final Object object = new AvroSRSchemaDataTranslator(schema).toConnectRow(list); + + // Then: + assertThat(object, sameInstance(list)); + } + + @Test + public void shouldThrowIfExtraFieldNotOptionalOrDefault() { + // Given: + final Schema schema = SchemaBuilder.struct() + .field("f1", SchemaBuilder.OPTIONAL_STRING_SCHEMA) + .field("f2", SchemaBuilder.OPTIONAL_INT32_SCHEMA) + .field("f3", SchemaBuilder.OPTIONAL_INT64_SCHEMA) + .field("f4", SchemaBuilder.STRING_SCHEMA) + .build(); + final Struct struct = new Struct(ORIGINAL_SCHEMA) + .put("f1", "abc") + .put("f2", 12); + + // When: + final Exception e = assertThrows( + KsqlException.class, + () -> new AvroSRSchemaDataTranslator(schema).toConnectRow(struct) + ); + + // Then: + assertThat(e.getMessage(), is("Missing default value for required Avro field: [f4]. " + + "This field appears in Avro schema in Schema Registry")); + } + + @Test + public void shouldThrowIfMissingField() { + // Given: + final Schema schema = SchemaBuilder.struct() + .field("f1", SchemaBuilder.OPTIONAL_STRING_SCHEMA) + .field("f3", SchemaBuilder.OPTIONAL_INT64_SCHEMA) + .build(); + final Struct struct = new Struct(ORIGINAL_SCHEMA) + .put("f1", "abc") + .put("f2", 12); + + // When: + final Exception e = assertThrows( + KsqlException.class, + () -> new AvroSRSchemaDataTranslator(schema).toConnectRow(struct) + ); + + // Then: + assertThat(e.getMessage(), is("Schema from Schema Registry misses field with name: f2")); + } + + @Test + public void shouldThrowIfConvertInvalidValue() { + // Given: + final Schema schema = SchemaBuilder.struct() + .field("f1", SchemaBuilder.STRING_SCHEMA) + .field("f2", SchemaBuilder.OPTIONAL_INT32_SCHEMA) + .field("f3", SchemaBuilder.OPTIONAL_INT64_SCHEMA) + .build(); + final Struct struct = new Struct(ORIGINAL_SCHEMA) + .put("f1", null) + .put("f2", 12); + + // When: + final Exception e = assertThrows( + DataException.class, + () -> new AvroSRSchemaDataTranslator(schema).toConnectRow(struct) + ); + + // Then: + assertThat(e.getMessage(), is("Invalid value: null used for required field: \"f1\", " + + "schema type: STRING")); + } +} \ No newline at end of file diff --git a/ksqldb-serde/src/test/java/io/confluent/ksql/serde/avro/KsqlAvroSerializerTest.java b/ksqldb-serde/src/test/java/io/confluent/ksql/serde/avro/KsqlAvroSerializerTest.java index 32efa02e7a03..b2288bf6135e 100644 --- a/ksqldb-serde/src/test/java/io/confluent/ksql/serde/avro/KsqlAvroSerializerTest.java +++ b/ksqldb-serde/src/test/java/io/confluent/ksql/serde/avro/KsqlAvroSerializerTest.java @@ -1318,8 +1318,8 @@ public void shouldThrowSerializeNonOptionalStruct() throws Exception { ); // Then: - assertThat(e.getMessage(), containsString("Missing default value for required " - + "field: [EXTRA]. This field appears in the schema in Schema Registry")); + assertThat(e.getMessage(), containsString("Missing default value for required Avro " + + "field: [EXTRA]. This field appears in Avro schema in Schema Registry")); } private static org.apache.avro.Schema legacyMapEntrySchema() { diff --git a/ksqldb-serde/src/test/java/io/confluent/ksql/serde/connect/ConnectSRSchemaDataTranslatorTest.java b/ksqldb-serde/src/test/java/io/confluent/ksql/serde/connect/ConnectSRSchemaDataTranslatorTest.java index 94abd6beff2b..00a6e4544e59 100644 --- a/ksqldb-serde/src/test/java/io/confluent/ksql/serde/connect/ConnectSRSchemaDataTranslatorTest.java +++ b/ksqldb-serde/src/test/java/io/confluent/ksql/serde/connect/ConnectSRSchemaDataTranslatorTest.java @@ -7,9 +7,6 @@ import static org.hamcrest.Matchers.sameInstance; import static org.junit.Assert.assertThrows; -import io.confluent.ksql.serde.avro.AvroFormat; -import io.confluent.ksql.serde.json.JsonFormat; -import io.confluent.ksql.serde.protobuf.ProtobufFormat; import io.confluent.ksql.util.KsqlException; import java.util.Collections; import java.util.List; @@ -39,8 +36,7 @@ public void shouldTransformStruct() { .put("f2", 12); // When: - final Object object = - new ConnectSRSchemaDataTranslator(schema, JsonFormat.NAME).toConnectRow(struct); + final Object object = new ConnectSRSchemaDataTranslator(schema).toConnectRow(struct); // Then: assertThat(object, instanceOf(Struct.class)); @@ -48,50 +44,6 @@ public void shouldTransformStruct() { assertThat(((Struct) object).get("f3"), is(nullValue())); } - @Test - public void shouldTransformStructWithAvroDefaultValue() { - // Given: - final Schema schema = SchemaBuilder.struct() - .field("f1", SchemaBuilder.OPTIONAL_STRING_SCHEMA) - .field("f2", SchemaBuilder.OPTIONAL_INT32_SCHEMA) - .field("f3", SchemaBuilder.int64().defaultValue(123L)) - .build(); - final Struct struct = new Struct(ORIGINAL_SCHEMA) - .put("f1", "abc") - .put("f2", 12); - - // When: - final Object object = - new ConnectSRSchemaDataTranslator(schema, AvroFormat.NAME).toConnectRow(struct); - - // Then: - assertThat(object, instanceOf(Struct.class)); - assertThat(((Struct) object).schema(), sameInstance(schema)); - assertThat(((Struct) object).get("f3"), is(123L)); - } - - @Test - public void shouldTransformStructWithProtobufDefaultValue() { - // Given: - final Schema schema = SchemaBuilder.struct() - .field("f1", SchemaBuilder.OPTIONAL_STRING_SCHEMA) - .field("f2", SchemaBuilder.OPTIONAL_INT32_SCHEMA) - .field("f3", SchemaBuilder.int64().defaultValue(123L)) - .build(); - final Struct struct = new Struct(ORIGINAL_SCHEMA) - .put("f1", "abc") - .put("f2", 12); - - // When: - final Object object = - new ConnectSRSchemaDataTranslator(schema, ProtobufFormat.NAME).toConnectRow(struct); - - // Then: - assertThat(object, instanceOf(Struct.class)); - assertThat(((Struct) object).schema(), sameInstance(schema)); - assertThat(((Struct) object).get("f3"), is(123L)); - } - @Test public void shouldNotTransformOtherType() { // Given: @@ -103,63 +55,12 @@ public void shouldNotTransformOtherType() { final List list = Collections.emptyList(); // When: - final Object object = - new ConnectSRSchemaDataTranslator(schema, JsonFormat.NAME).toConnectRow(list); + final Object object = new ConnectSRSchemaDataTranslator(schema).toConnectRow(list); // Then: assertThat(object, sameInstance(list)); } - @Test - public void shouldThrowIfExtraAvroFieldNotOptionalOrDefault() { - // Given: - final Schema schema = SchemaBuilder.struct() - .field("f1", SchemaBuilder.OPTIONAL_STRING_SCHEMA) - .field("f2", SchemaBuilder.OPTIONAL_INT32_SCHEMA) - .field("f3", SchemaBuilder.OPTIONAL_INT64_SCHEMA) - .field("f4", SchemaBuilder.STRING_SCHEMA) - .build(); - final Struct struct = new Struct(ORIGINAL_SCHEMA) - .put("f1", "abc") - .put("f2", 12); - - // When: - final Exception e = assertThrows( - KsqlException.class, - () -> new ConnectSRSchemaDataTranslator(schema, AvroFormat.NAME).toConnectRow(struct) - - ); - - // Then: - assertThat(e.getMessage(), is("Missing default value for required field: [f4]. " - + "This field appears in the schema in Schema Registry")); - } - - @Test - public void shouldThrowIfExtraProtobufFieldNotOptionalOrDefault() { - // Given: - final Schema schema = SchemaBuilder.struct() - .field("f1", SchemaBuilder.OPTIONAL_STRING_SCHEMA) - .field("f2", SchemaBuilder.OPTIONAL_INT32_SCHEMA) - .field("f3", SchemaBuilder.OPTIONAL_INT64_SCHEMA) - .field("f4", SchemaBuilder.STRING_SCHEMA) - .build(); - final Struct struct = new Struct(ORIGINAL_SCHEMA) - .put("f1", "abc") - .put("f2", 12); - - // When: - final Exception e = assertThrows( - KsqlException.class, - () -> new ConnectSRSchemaDataTranslator(schema, ProtobufFormat.NAME).toConnectRow(struct) - - ); - - // Then: - assertThat(e.getMessage(), is("Missing default value for required field: [f4]. " - + "This field appears in the schema in Schema Registry")); - } - @Test public void shouldThrowIfMissingField() { // Given: @@ -174,7 +75,7 @@ public void shouldThrowIfMissingField() { // When: final Exception e = assertThrows( KsqlException.class, - () -> new ConnectSRSchemaDataTranslator(schema, JsonFormat.NAME).toConnectRow(struct) + () -> new ConnectSRSchemaDataTranslator(schema).toConnectRow(struct) ); // Then: @@ -196,7 +97,7 @@ public void shouldThrowIfConvertInvalidValue() { // When: final Exception e = assertThrows( DataException.class, - () -> new ConnectSRSchemaDataTranslator(schema, JsonFormat.NAME).toConnectRow(struct) + () -> new ConnectSRSchemaDataTranslator(schema).toConnectRow(struct) ); // Then: diff --git a/ksqldb-serde/src/test/java/io/confluent/ksql/serde/protobuf/ProtobufSchemasTest.java b/ksqldb-serde/src/test/java/io/confluent/ksql/serde/protobuf/ProtobufSchemasTest.java index 73ce21011514..1e835a70f282 100644 --- a/ksqldb-serde/src/test/java/io/confluent/ksql/serde/protobuf/ProtobufSchemasTest.java +++ b/ksqldb-serde/src/test/java/io/confluent/ksql/serde/protobuf/ProtobufSchemasTest.java @@ -99,34 +99,4 @@ public void shouldReturnSameSchemaOnNullNewName() { .build()) .build())); } - - @Test - public void shouldNameSchemaWithArray() { - // Given - final Schema namedSchema = SchemaBuilder.array(Schema.INT32_SCHEMA).build(); - - // When - final Schema schemaWithNewName = ProtobufSchemas.schemaWithName(namedSchema, CUSTOM_FULL_SCHEMA_NAME); - - // Then - assertThat(schemaWithNewName, is(SchemaBuilder - .array(Schema.INT32_SCHEMA) - .name(CUSTOM_FULL_SCHEMA_NAME) - .build())); - } - - @Test - public void shouldNameSchemaWithMap() { -// Given - final Schema namedSchema = SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA).build(); - - // When - final Schema schemaWithNewName = ProtobufSchemas.schemaWithName(namedSchema, CUSTOM_FULL_SCHEMA_NAME); - - // Then - assertThat(schemaWithNewName, is(SchemaBuilder - .map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA) - .name(CUSTOM_FULL_SCHEMA_NAME) - .build())); - } }