diff --git a/ksqldb-functional-tests/src/test/resources/query-validation-tests/elements.json b/ksqldb-functional-tests/src/test/resources/query-validation-tests/elements.json index 18c5aac3668f..57e7e661c703 100644 --- a/ksqldb-functional-tests/src/test/resources/query-validation-tests/elements.json +++ b/ksqldb-functional-tests/src/test/resources/query-validation-tests/elements.json @@ -1258,7 +1258,7 @@ "outputs": [{"topic": "OUTPUT", "key": 42, "value": {"c1": 4}}], "expectedException": { "type": "org.apache.kafka.streams.errors.StreamsException", - "message": "Error serializing message to topic: OUTPUT. Missing default value for required Avro field: [c2]." + "message": "Error serializing message to topic: OUTPUT. Missing default value for required field: [c2]." } }, { diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/InsertValuesExecutor.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/InsertValuesExecutor.java index 659946cc7c29..d8652d06e0de 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/InsertValuesExecutor.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/InsertValuesExecutor.java @@ -367,14 +367,23 @@ private static void ensureKeySchemasMatch( return; } + final SchemaRegistryClient schemaRegistryClient = serviceContext.getSchemaRegistryClient(); + + final FormatInfo formatInfo = addSerializerMissingFormatFields( + schemaRegistryClient, + dataSource.getKsqlTopic().getValueFormat().getFormatInfo(), + dataSource.getKafkaTopicName(), + true + ); + final ParsedSchema schema = format - .getSchemaTranslator(keyFormat.getFormatInfo().getProperties()) + .getSchemaTranslator(formatInfo.getProperties()) .toParsedSchema(keySchema); final Optional latest; try { latest = SchemaRegistryUtil.getLatestSchema( - serviceContext.getSchemaRegistryClient(), + schemaRegistryClient, dataSource.getKafkaTopicName(), true); @@ -465,7 +474,7 @@ private byte[] serializeValue( * they were created previous to this fix. Those previous streams would fail with INSERT. * The best option was to dynamically look at the SR schema during an INSERT statement. */ - private FormatInfo addSerializerMissingFormatFields( + private static FormatInfo addSerializerMissingFormatFields( final SchemaRegistryClient srClient, final FormatInfo formatInfo, final String topicName, @@ -480,19 +489,17 @@ private FormatInfo addSerializerMissingFormatFields( final Set supportedProperties = format.getSupportedProperties(); - // So far, the only missing required field is the FULL_SCHEMA_NAME - // This field allows the serializer to check the SR schema for compatibility - if (!supportedProperties.contains(ConnectProperties.FULL_SCHEMA_NAME)) { - return formatInfo; - } - final ImmutableMap.Builder propertiesBuilder = ImmutableMap.builder(); propertiesBuilder.putAll(formatInfo.getProperties()); - // Add the FULL_SCHEMA_NAME if found on SR - SchemaRegistryUtil.getParsedSchema(srClient, topicName, isKey).map(ParsedSchema::name) - .ifPresent(schemaName -> - propertiesBuilder.put(ConnectProperties.FULL_SCHEMA_NAME, schemaName)); + // The FULL_SCHEMA_NAME allows the serializer to choose the schema definition + if (supportedProperties.contains(ConnectProperties.FULL_SCHEMA_NAME)) { + if (!formatInfo.getProperties().containsKey(ConnectProperties.FULL_SCHEMA_NAME)) { + SchemaRegistryUtil.getParsedSchema(srClient, topicName, isKey).map(ParsedSchema::name) + .ifPresent(schemaName -> + propertiesBuilder.put(ConnectProperties.FULL_SCHEMA_NAME, schemaName)); + } + } return FormatInfo.of(formatInfo.getFormat(), propertiesBuilder.build()); } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/InsertValuesExecutorTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/InsertValuesExecutorTest.java index caf0de2d5205..a7e934c5e5da 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/InsertValuesExecutorTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/InsertValuesExecutorTest.java @@ -1042,7 +1042,10 @@ public void shouldSupportInsertIntoWithSchemaInferenceMatch() throws Exception { @Test public void shouldThrowOnSchemaInferenceMismatchForKey() throws Exception { // Given: - when(srClient.getLatestSchemaMetadata(Mockito.any())).thenReturn(new SchemaMetadata(1, 1, "schema")); + when(srClient.getLatestSchemaMetadata(Mockito.any())) + .thenReturn(new SchemaMetadata(1, 1, "schema")); + when(srClient.getSchemaById(1)) + .thenReturn(new AvroSchema(RAW_SCHEMA)); givenDataSourceWithSchema( TOPIC_NAME, SCHEMA, @@ -1074,11 +1077,18 @@ public void shouldThrowOnSchemaInferenceMismatchForKey() throws Exception { @Test public void shouldBuildSerdeWithSchemaFullName() throws Exception { + final String AVRO_SCHEMA = "{\"type\":\"record\"," + + "\"name\":\"TestSchema\"," + + "\"namespace\":\"io.avro\"," + + "\"fields\":[" + + "{\"name\":\"k0\",\"type\":[\"null\",\"string\"],\"default\":null}," + + "{\"name\":\"k1\",\"type\":[\"null\",\"string\"],\"default\":null}]}"; + // Given: when(srClient.getLatestSchemaMetadata(Mockito.any())) .thenReturn(new SchemaMetadata(1, 1, "\"string\"")); when(srClient.getSchemaById(1)) - .thenReturn(new AvroSchema(RAW_SCHEMA)); + .thenReturn(new AvroSchema(AVRO_SCHEMA)); givenDataSourceWithSchema( TOPIC_NAME, SCHEMA, @@ -1103,7 +1113,7 @@ public void shouldBuildSerdeWithSchemaFullName() throws Exception { // Then: verify(keySerdeFactory).create( FormatInfo.of(FormatFactory.AVRO.name(), ImmutableMap.of( - AvroProperties.FULL_SCHEMA_NAME,"io.confluent.ksql.avro_schemas.KsqlDataSourceSchema" + AvroProperties.FULL_SCHEMA_NAME,"io.avro.TestSchema" )), PersistenceSchema.from(SCHEMA.key(), SerdeFeatures.of(SerdeFeature.UNWRAP_SINGLES)), new KsqlConfig(ImmutableMap.of()), @@ -1115,7 +1125,7 @@ public void shouldBuildSerdeWithSchemaFullName() throws Exception { verify(valueSerdeFactory).create( FormatInfo.of(FormatFactory.AVRO.name(), ImmutableMap.of( - AvroProperties.FULL_SCHEMA_NAME,"io.confluent.ksql.avro_schemas.KsqlDataSourceSchema" + AvroProperties.FULL_SCHEMA_NAME,"io.avro.TestSchema" )), PersistenceSchema.from(SCHEMA.value(), SerdeFeatures.of()), new KsqlConfig(ImmutableMap.of()), @@ -1166,7 +1176,7 @@ public void shouldBuildCorrectSerde() { public void shouldThrowWhenNotAuthorizedToReadKeySchemaToSR() throws Exception { // Given: when(srClient.getLatestSchemaMetadata(Mockito.any())) - .thenThrow(new RestClientException("foo", 401, 1)); + .thenThrow(new RestClientException("User is denied operation Read on foo-key", 401, 1)); givenDataSourceWithSchema( TOPIC_NAME, SCHEMA, diff --git a/ksqldb-serde/src/main/java/io/confluent/ksql/serde/protobuf/ProtobufSchemaTranslator.java b/ksqldb-serde/src/main/java/io/confluent/ksql/serde/protobuf/ProtobufSchemaTranslator.java index c9a73596bca6..1f92d8259724 100644 --- a/ksqldb-serde/src/main/java/io/confluent/ksql/serde/protobuf/ProtobufSchemaTranslator.java +++ b/ksqldb-serde/src/main/java/io/confluent/ksql/serde/protobuf/ProtobufSchemaTranslator.java @@ -17,15 +17,18 @@ import static io.confluent.connect.protobuf.ProtobufDataConfig.WRAPPER_FOR_RAW_PRIMITIVES_CONFIG; +import com.google.common.base.Strings; import com.google.common.collect.ImmutableMap; import io.confluent.connect.protobuf.ProtobufData; import io.confluent.connect.protobuf.ProtobufDataConfig; import io.confluent.kafka.schemaregistry.ParsedSchema; import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema; +import io.confluent.ksql.serde.SchemaFullNameAppender; import io.confluent.ksql.serde.connect.ConnectSchemaTranslator; import java.util.HashMap; import java.util.Map; import java.util.Objects; +import java.util.Optional; import org.apache.kafka.connect.data.Schema; /** @@ -35,6 +38,7 @@ class ProtobufSchemaTranslator implements ConnectSchemaTranslator { private final ProtobufProperties properties; private final Map baseConfigs; + private final Optional fullNameSchema; private Map updatedConfigs; private ProtobufData protobufData; @@ -43,7 +47,9 @@ class ProtobufSchemaTranslator implements ConnectSchemaTranslator { this.properties = Objects.requireNonNull(properties, "properties"); this.baseConfigs = ImmutableMap.of( WRAPPER_FOR_RAW_PRIMITIVES_CONFIG, properties.getUnwrapPrimitives()); - + this.fullNameSchema = Optional.ofNullable( + Strings.emptyToNull(Strings.nullToEmpty(properties.getFullSchemaName()).trim()) + ); this.updatedConfigs = baseConfigs; this.protobufData = new ProtobufData(new ProtobufDataConfig(baseConfigs)); } @@ -71,6 +77,13 @@ public Schema toConnectSchema(final ParsedSchema schema) { public ParsedSchema fromConnectSchema(final Schema schema) { // Bug in ProtobufData means `fromConnectSchema` throws on the second invocation if using // default naming. - return new ProtobufData(new ProtobufDataConfig(updatedConfigs)).fromConnectSchema(schema); + return new ProtobufData(new ProtobufDataConfig(updatedConfigs)) + .fromConnectSchema(injectSchemaFullName(schema)); + } + + private Schema injectSchemaFullName(final Schema origSchema) { + return fullNameSchema + .map(fullName -> SchemaFullNameAppender.appendSchemaFullName(origSchema, fullName)) + .orElse(origSchema); } } diff --git a/ksqldb-serde/src/test/java/io/confluent/ksql/serde/SchemaFullNameAppenderTest.java b/ksqldb-serde/src/test/java/io/confluent/ksql/serde/SchemaFullNameAppenderTest.java new file mode 100644 index 000000000000..980c7e886aa1 --- /dev/null +++ b/ksqldb-serde/src/test/java/io/confluent/ksql/serde/SchemaFullNameAppenderTest.java @@ -0,0 +1,101 @@ +/* + * Copyright 2022 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.serde; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class SchemaFullNameAppenderTest { + private static final String TEST_NAME = "test"; + + @Test + public void testAppenderReturnsSingleStructSchemaWithFullName() { + // Given + final Schema unNamedSchema = SchemaBuilder.struct().build(); + + // When + final Schema schema = SchemaFullNameAppender.appendSchemaFullName(unNamedSchema, TEST_NAME); + + // Then + assertThat(schema, is(SchemaBuilder.struct().name(TEST_NAME).build())); + } + + @Test + public void testAppenderReturnsStructSchemaInArrayWithFullName() { + // Given + final Schema unNamedSchema = SchemaBuilder.array( + SchemaBuilder.struct().build() + ).build(); + + // When + final Schema schema = SchemaFullNameAppender.appendSchemaFullName(unNamedSchema, TEST_NAME); + + // Then + assertThat(schema, is(SchemaBuilder.array( + SchemaBuilder.struct().name(TEST_NAME).build() + ).build())); + } + + @Test + public void testAppenderReturnsMapWithFullName() { + // Given + final Schema unNamedSchema = SchemaBuilder.map( + SchemaBuilder.struct().build(), + SchemaBuilder.struct().build() + ); + + // When + final Schema schema = SchemaFullNameAppender.appendSchemaFullName(unNamedSchema, TEST_NAME); + + // Then + assertThat(schema, is(SchemaBuilder.map( + SchemaBuilder.struct().name(TEST_NAME + "_MapKey").build(), + SchemaBuilder.struct().name(TEST_NAME + "_MapValue").build() + ).name(TEST_NAME).build())); + } + + @Test + public void testReplacesInvalidDottedFieldNamesToValidFieldNames() { + // Given + final Schema unNamedSchema = SchemaBuilder.struct() + .field("internal.struct", + SchemaBuilder.struct() + .field("product.id", Schema.INT32_SCHEMA) + .build()) + .build(); + + // When + final Schema schema = SchemaFullNameAppender.appendSchemaFullName(unNamedSchema, TEST_NAME); + + // Then + assertThat(schema, is(SchemaBuilder.struct() + .field("internal_struct", + SchemaBuilder.struct() + .field("product_id", Schema.INT32_SCHEMA) + .name(TEST_NAME + "_internal.struct") + .build()) + .name(TEST_NAME) + .build())); + + } +}