Skip to content

Commit

Permalink
Update InsertValuesExecutor test and add SchemaFullNameAppender tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ConfluentJenkins committed Apr 8, 2022
1 parent f66934a commit 50b15d2
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1258,7 +1258,7 @@
"outputs": [{"topic": "OUTPUT", "key": 42, "value": {"c1": 4}}],
"expectedException": {
"type": "org.apache.kafka.streams.errors.StreamsException",
"message": "Error serializing message to topic: OUTPUT. Missing default value for required Avro field: [c2]."
"message": "Error serializing message to topic: OUTPUT. Missing default value for required field: [c2]."
}
},
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,14 +367,23 @@ private static void ensureKeySchemasMatch(
return;
}

final SchemaRegistryClient schemaRegistryClient = serviceContext.getSchemaRegistryClient();

final FormatInfo formatInfo = addSerializerMissingFormatFields(
schemaRegistryClient,
dataSource.getKsqlTopic().getValueFormat().getFormatInfo(),
dataSource.getKafkaTopicName(),
true
);

final ParsedSchema schema = format
.getSchemaTranslator(keyFormat.getFormatInfo().getProperties())
.getSchemaTranslator(formatInfo.getProperties())
.toParsedSchema(keySchema);

final Optional<SchemaMetadata> latest;
try {
latest = SchemaRegistryUtil.getLatestSchema(
serviceContext.getSchemaRegistryClient(),
schemaRegistryClient,
dataSource.getKafkaTopicName(),
true);

Expand Down Expand Up @@ -465,7 +474,7 @@ private byte[] serializeValue(
* they were created previous to this fix. Those previous streams would fail with INSERT.
* The best option was to dynamically look at the SR schema during an INSERT statement.
*/
private FormatInfo addSerializerMissingFormatFields(
private static FormatInfo addSerializerMissingFormatFields(
final SchemaRegistryClient srClient,
final FormatInfo formatInfo,
final String topicName,
Expand All @@ -480,19 +489,17 @@ private FormatInfo addSerializerMissingFormatFields(

final Set<String> supportedProperties = format.getSupportedProperties();

// So far, the only missing required field is the FULL_SCHEMA_NAME
// This field allows the serializer to check the SR schema for compatibility
if (!supportedProperties.contains(ConnectProperties.FULL_SCHEMA_NAME)) {
return formatInfo;
}

final ImmutableMap.Builder propertiesBuilder = ImmutableMap.builder();
propertiesBuilder.putAll(formatInfo.getProperties());

// Add the FULL_SCHEMA_NAME if found on SR
SchemaRegistryUtil.getParsedSchema(srClient, topicName, isKey).map(ParsedSchema::name)
.ifPresent(schemaName ->
propertiesBuilder.put(ConnectProperties.FULL_SCHEMA_NAME, schemaName));
// The FULL_SCHEMA_NAME allows the serializer to choose the schema definition
if (supportedProperties.contains(ConnectProperties.FULL_SCHEMA_NAME)) {
if (!formatInfo.getProperties().containsKey(ConnectProperties.FULL_SCHEMA_NAME)) {
SchemaRegistryUtil.getParsedSchema(srClient, topicName, isKey).map(ParsedSchema::name)
.ifPresent(schemaName ->
propertiesBuilder.put(ConnectProperties.FULL_SCHEMA_NAME, schemaName));
}
}

return FormatInfo.of(formatInfo.getFormat(), propertiesBuilder.build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1042,7 +1042,10 @@ public void shouldSupportInsertIntoWithSchemaInferenceMatch() throws Exception {
@Test
public void shouldThrowOnSchemaInferenceMismatchForKey() throws Exception {
// Given:
when(srClient.getLatestSchemaMetadata(Mockito.any())).thenReturn(new SchemaMetadata(1, 1, "schema"));
when(srClient.getLatestSchemaMetadata(Mockito.any()))
.thenReturn(new SchemaMetadata(1, 1, "schema"));
when(srClient.getSchemaById(1))
.thenReturn(new AvroSchema(RAW_SCHEMA));
givenDataSourceWithSchema(
TOPIC_NAME,
SCHEMA,
Expand Down Expand Up @@ -1074,11 +1077,18 @@ public void shouldThrowOnSchemaInferenceMismatchForKey() throws Exception {

@Test
public void shouldBuildSerdeWithSchemaFullName() throws Exception {
final String AVRO_SCHEMA = "{\"type\":\"record\","
+ "\"name\":\"TestSchema\","
+ "\"namespace\":\"io.avro\","
+ "\"fields\":["
+ "{\"name\":\"k0\",\"type\":[\"null\",\"string\"],\"default\":null},"
+ "{\"name\":\"k1\",\"type\":[\"null\",\"string\"],\"default\":null}]}";

// Given:
when(srClient.getLatestSchemaMetadata(Mockito.any()))
.thenReturn(new SchemaMetadata(1, 1, "\"string\""));
when(srClient.getSchemaById(1))
.thenReturn(new AvroSchema(RAW_SCHEMA));
.thenReturn(new AvroSchema(AVRO_SCHEMA));
givenDataSourceWithSchema(
TOPIC_NAME,
SCHEMA,
Expand All @@ -1103,7 +1113,7 @@ public void shouldBuildSerdeWithSchemaFullName() throws Exception {
// Then:
verify(keySerdeFactory).create(
FormatInfo.of(FormatFactory.AVRO.name(), ImmutableMap.of(
AvroProperties.FULL_SCHEMA_NAME,"io.confluent.ksql.avro_schemas.KsqlDataSourceSchema"
AvroProperties.FULL_SCHEMA_NAME,"io.avro.TestSchema"
)),
PersistenceSchema.from(SCHEMA.key(), SerdeFeatures.of(SerdeFeature.UNWRAP_SINGLES)),
new KsqlConfig(ImmutableMap.of()),
Expand All @@ -1115,7 +1125,7 @@ public void shouldBuildSerdeWithSchemaFullName() throws Exception {

verify(valueSerdeFactory).create(
FormatInfo.of(FormatFactory.AVRO.name(), ImmutableMap.of(
AvroProperties.FULL_SCHEMA_NAME,"io.confluent.ksql.avro_schemas.KsqlDataSourceSchema"
AvroProperties.FULL_SCHEMA_NAME,"io.avro.TestSchema"
)),
PersistenceSchema.from(SCHEMA.value(), SerdeFeatures.of()),
new KsqlConfig(ImmutableMap.of()),
Expand Down Expand Up @@ -1166,7 +1176,7 @@ public void shouldBuildCorrectSerde() {
public void shouldThrowWhenNotAuthorizedToReadKeySchemaToSR() throws Exception {
// Given:
when(srClient.getLatestSchemaMetadata(Mockito.any()))
.thenThrow(new RestClientException("foo", 401, 1));
.thenThrow(new RestClientException("User is denied operation Read on foo-key", 401, 1));
givenDataSourceWithSchema(
TOPIC_NAME,
SCHEMA,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,18 @@

import static io.confluent.connect.protobuf.ProtobufDataConfig.WRAPPER_FOR_RAW_PRIMITIVES_CONFIG;

import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import io.confluent.connect.protobuf.ProtobufData;
import io.confluent.connect.protobuf.ProtobufDataConfig;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import io.confluent.ksql.serde.SchemaFullNameAppender;
import io.confluent.ksql.serde.connect.ConnectSchemaTranslator;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import org.apache.kafka.connect.data.Schema;

/**
Expand All @@ -35,6 +38,7 @@ class ProtobufSchemaTranslator implements ConnectSchemaTranslator {

private final ProtobufProperties properties;
private final Map<String, Object> baseConfigs;
private final Optional<String> fullNameSchema;

private Map<String, Object> updatedConfigs;
private ProtobufData protobufData;
Expand All @@ -43,7 +47,9 @@ class ProtobufSchemaTranslator implements ConnectSchemaTranslator {
this.properties = Objects.requireNonNull(properties, "properties");
this.baseConfigs = ImmutableMap.of(
WRAPPER_FOR_RAW_PRIMITIVES_CONFIG, properties.getUnwrapPrimitives());

this.fullNameSchema = Optional.ofNullable(
Strings.emptyToNull(Strings.nullToEmpty(properties.getFullSchemaName()).trim())
);
this.updatedConfigs = baseConfigs;
this.protobufData = new ProtobufData(new ProtobufDataConfig(baseConfigs));
}
Expand Down Expand Up @@ -71,6 +77,13 @@ public Schema toConnectSchema(final ParsedSchema schema) {
public ParsedSchema fromConnectSchema(final Schema schema) {
// Bug in ProtobufData means `fromConnectSchema` throws on the second invocation if using
// default naming.
return new ProtobufData(new ProtobufDataConfig(updatedConfigs)).fromConnectSchema(schema);
return new ProtobufData(new ProtobufDataConfig(updatedConfigs))
.fromConnectSchema(injectSchemaFullName(schema));
}

private Schema injectSchemaFullName(final Schema origSchema) {
return fullNameSchema
.map(fullName -> SchemaFullNameAppender.appendSchemaFullName(origSchema, fullName))
.orElse(origSchema);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright 2022 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.serde;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;

import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
public class SchemaFullNameAppenderTest {
private static final String TEST_NAME = "test";

@Test
public void testAppenderReturnsSingleStructSchemaWithFullName() {
// Given
final Schema unNamedSchema = SchemaBuilder.struct().build();

// When
final Schema schema = SchemaFullNameAppender.appendSchemaFullName(unNamedSchema, TEST_NAME);

// Then
assertThat(schema, is(SchemaBuilder.struct().name(TEST_NAME).build()));
}

@Test
public void testAppenderReturnsStructSchemaInArrayWithFullName() {
// Given
final Schema unNamedSchema = SchemaBuilder.array(
SchemaBuilder.struct().build()
).build();

// When
final Schema schema = SchemaFullNameAppender.appendSchemaFullName(unNamedSchema, TEST_NAME);

// Then
assertThat(schema, is(SchemaBuilder.array(
SchemaBuilder.struct().name(TEST_NAME).build()
).build()));
}

@Test
public void testAppenderReturnsMapWithFullName() {
// Given
final Schema unNamedSchema = SchemaBuilder.map(
SchemaBuilder.struct().build(),
SchemaBuilder.struct().build()
);

// When
final Schema schema = SchemaFullNameAppender.appendSchemaFullName(unNamedSchema, TEST_NAME);

// Then
assertThat(schema, is(SchemaBuilder.map(
SchemaBuilder.struct().name(TEST_NAME + "_MapKey").build(),
SchemaBuilder.struct().name(TEST_NAME + "_MapValue").build()
).name(TEST_NAME).build()));
}

@Test
public void testReplacesInvalidDottedFieldNamesToValidFieldNames() {
// Given
final Schema unNamedSchema = SchemaBuilder.struct()
.field("internal.struct",
SchemaBuilder.struct()
.field("product.id", Schema.INT32_SCHEMA)
.build())
.build();

// When
final Schema schema = SchemaFullNameAppender.appendSchemaFullName(unNamedSchema, TEST_NAME);

// Then
assertThat(schema, is(SchemaBuilder.struct()
.field("internal_struct",
SchemaBuilder.struct()
.field("product_id", Schema.INT32_SCHEMA)
.name(TEST_NAME + "_internal.struct")
.build())
.name(TEST_NAME)
.build()));

}
}

0 comments on commit 50b15d2

Please sign in to comment.