Skip to content

Commit

Permalink
Return to AvroSRSchemaDataTranslator and use ConnectSRSchemaDataTrans…
Browse files Browse the repository at this point in the history
…lator... for Protobuf/Json
  • Loading branch information
spena committed Apr 12, 2022
1 parent ab8636c commit 1c3c8b2
Show file tree
Hide file tree
Showing 14 changed files with 275 additions and 309 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,22 +123,26 @@ public static Optional<ParsedSchema> getParsedSchema(
try {
return Optional.of(srClient.getSchemaById(schemaId));
} catch (final Exception e) {
if (isAuthErrorCode(e)) {
final AclOperation deniedOperation = SchemaRegistryUtil.getDeniedOperation(e.getMessage());

if (deniedOperation != AclOperation.UNKNOWN) {
throw new KsqlSchemaAuthorizationException(
deniedOperation,
subject
);
}
}
throwOnAuthError(e, subject);

throw new KsqlException(
"Could not get schema for subject " + subject + " and id " + schemaId, e);
}
}

private static void throwOnAuthError(final Exception e, final String subject) {
if (isAuthErrorCode(e)) {
final AclOperation deniedOperation = SchemaRegistryUtil.getDeniedOperation(e.getMessage());

if (deniedOperation != AclOperation.UNKNOWN) {
throw new KsqlSchemaAuthorizationException(
deniedOperation,
subject
);
}
}
}

public static Optional<SchemaMetadata> getLatestSchema(
final SchemaRegistryClient srClient,
final String topic,
Expand All @@ -160,16 +164,7 @@ public static Optional<SchemaMetadata> getLatestSchema(
return Optional.empty();
}

if (isAuthErrorCode(e)) {
final AclOperation deniedOperation = SchemaRegistryUtil.getDeniedOperation(e.getMessage());

if (deniedOperation != AclOperation.UNKNOWN) {
throw new KsqlSchemaAuthorizationException(
deniedOperation,
subject
);
}
}
throwOnAuthError(e, subject);

throw new KsqlException("Could not get latest schema for subject " + subject, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1258,7 +1258,7 @@
"outputs": [{"topic": "OUTPUT", "key": 42, "value": {"c1": 4}}],
"expectedException": {
"type": "org.apache.kafka.streams.errors.StreamsException",
"message": "Error serializing message to topic: OUTPUT. Missing default value for required field: [c2]."
"message": "Error serializing message to topic: OUTPUT. Missing default value for required Avro field: [c2]."
}
},
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ private static void ensureKeySchemasMatch(

final FormatInfo formatInfo = addSerializerMissingFormatFields(
schemaRegistryClient,
dataSource.getKsqlTopic().getValueFormat().getFormatInfo(),
dataSource.getKsqlTopic().getKeyFormat().getFormatInfo(),
dataSource.getKafkaTopicName(),
true
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.serde.avro;

import io.confluent.ksql.serde.connect.ConnectSRSchemaDataTranslator;
import io.confluent.ksql.util.KsqlException;
import java.util.Optional;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;

/**
* Translates KSQL data and schemas to Avro equivalents.
*
* <p>Responsible for converting the KSQL schema to a version ready for connect to convert to an
* avro schema.
*
* <p>This includes ensuring field names are valid Avro field names and that nested types do not
* have name clashes.
*/
public class AvroSRSchemaDataTranslator extends ConnectSRSchemaDataTranslator {

AvroSRSchemaDataTranslator(final Schema schema) {
super(schema);
}

@Override
public Object toConnectRow(final Object ksqlData) {
if (!(ksqlData instanceof Struct)) {
return ksqlData;
}
final Schema schema = getSchema();
final Struct struct = new Struct(schema);
final Struct originalData = (Struct) ksqlData;
final Schema originalSchema = originalData.schema();

validate(originalSchema);

for (final Field field : schema.fields()) {
final Optional<Field> originalField = originalSchema.fields().stream()
.filter(f -> field.name().equals(f.name())).findFirst();
if (originalField.isPresent()) {
struct.put(field, originalData.get(originalField.get()));
} else {
if (field.schema().defaultValue() != null || field.schema().isOptional()) {
struct.put(field, field.schema().defaultValue());
} else {
throw new KsqlException("Missing default value for required Avro field: [" + field.name()
+ "]. This field appears in Avro schema in Schema Registry");
}
}
}

return struct;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import io.confluent.ksql.serde.SerdeUtils;
import io.confluent.ksql.serde.connect.ConnectDataTranslator;
import io.confluent.ksql.serde.connect.ConnectSRSchemaDataTranslator;
import io.confluent.ksql.serde.connect.DataTranslator;
import io.confluent.ksql.serde.connect.KsqlConnectDeserializer;
import io.confluent.ksql.serde.connect.KsqlConnectSerializer;
Expand Down Expand Up @@ -151,7 +150,7 @@ private DataTranslator createAvroTranslator(final Schema schema,
// deserialization, if physical schema exists, we use original schema to translate to ksql data.
return physicalSchema.<DataTranslator>map(
value -> isDeserializer ? new ConnectDataTranslator(schema)
: new ConnectSRSchemaDataTranslator(value, AvroFormat.NAME))
: new AvroSRSchemaDataTranslator(value))
.orElseGet(() -> new AvroDataTranslator(schema, fullSchemaName));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,7 @@

package io.confluent.ksql.serde.connect;

import io.confluent.ksql.serde.avro.AvroFormat;
import io.confluent.ksql.serde.protobuf.ProtobufFormat;
import io.confluent.ksql.util.KsqlException;
import java.util.Optional;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Schema.Type;
Expand All @@ -33,19 +30,9 @@
* fail.
*/
public class ConnectSRSchemaDataTranslator extends ConnectDataTranslator {
private RowTranslator rowTranslator;

public ConnectSRSchemaDataTranslator(final Schema schema, final String formatName) {
public ConnectSRSchemaDataTranslator(final Schema schema) {
super(schema);

switch (formatName.trim().toUpperCase()) {
case AvroFormat.NAME:
case ProtobufFormat.NAME:
this.rowTranslator = new AvroAndProtobufRowTranslator();
break;
default:
this.rowTranslator = new DefaultRowTranslator();
}
}

protected void validate(final Schema originalSchema) {
Expand All @@ -66,77 +53,25 @@ protected void validate(final Schema originalSchema) {

@Override
public Object toConnectRow(final Object ksqlData) {
return rowTranslator.translate(ksqlData);
}

private interface RowTranslator {
Object translate(Object ksqlData);
}

/**
* Reconstruct ksqlData struct with given schema and try to put original data in it.
* Schema may have more fields than ksqlData, don't put those field by default. If needed by
* some format like Avro, use the AvroAndProtobufRowTranslator
*/
private class DefaultRowTranslator implements RowTranslator {
@Override
public Object translate(final Object ksqlData) {
if (ksqlData instanceof Struct) {
validate(((Struct) ksqlData).schema());
final Schema schema = getSchema();
final Struct struct = new Struct(schema);
final Struct source = (Struct) ksqlData;

for (final Field sourceField : source.schema().fields()) {
final Object value = source.get(sourceField);
struct.put(sourceField.name(), value);
}

return struct;
}

return ksqlData;
}
}

/**
* Translates KSQL data and schemas to Avro and Protobuf equivalents.
*
* <p>Responsible for converting the KSQL schema to a version ready for connect to convert to an
* avro and protobuf schema.
*
* <p>This includes ensuring field names are valid Avro and Protobuf field names and that nested
* types do not have name clashes.
*/
private class AvroAndProtobufRowTranslator implements RowTranslator {
@Override
public Object translate(final Object ksqlData) {
if (!(ksqlData instanceof Struct)) {
return ksqlData;
}
/*
* Reconstruct ksqlData struct with given schema and try to put original data in it.
* Schema may have more fields than ksqlData, don't put those field by default. If needed by
* some format like Avro, create new subclass to handle
*/
if (ksqlData instanceof Struct) {
validate(((Struct) ksqlData).schema());
final Schema schema = getSchema();
final Struct struct = new Struct(schema);
final Struct originalData = (Struct) ksqlData;
final Schema originalSchema = originalData.schema();
final Struct source = (Struct) ksqlData;

validate(originalSchema);

for (final Field field : schema.fields()) {
final Optional<Field> originalField = originalSchema.fields().stream()
.filter(f -> field.name().equals(f.name())).findFirst();
if (originalField.isPresent()) {
struct.put(field, originalData.get(originalField.get()));
} else {
if (field.schema().defaultValue() != null || field.schema().isOptional()) {
struct.put(field, field.schema().defaultValue());
} else {
throw new KsqlException("Missing default value for required field: [" + field.name()
+ "]. This field appears in the schema in Schema Registry");
}
}
for (final Field sourceField : source.schema().fields()) {
final Object value = source.get(sourceField);
struct.put(sourceField.name(), value);
}

return struct;
}

return ksqlData;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,7 @@ private <T> Serializer<T> createSerializer(
: getConverter();

final ConnectDataTranslator dataTranslator =
physicalSchema.isPresent()
? new ConnectSRSchemaDataTranslator(physicalSchema.get(), JsonSchemaFormat.NAME)
physicalSchema.isPresent() ? new ConnectSRSchemaDataTranslator(physicalSchema.get())
: new ConnectDataTranslator(schema);

return new KsqlConnectSerializer<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,7 @@

import io.confluent.ksql.serde.connect.ConnectDataTranslator;
import io.confluent.ksql.serde.connect.DataTranslator;
import io.confluent.ksql.util.DecimalUtil;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
Expand Down Expand Up @@ -52,64 +46,32 @@ public Object toKsqlRow(final Schema connectSchema, final Object connectObject)
return null;
}

return replaceSchema(ksqlSchema, protoCompatibleRow);
return objectWithSchema(ksqlSchema, protoCompatibleRow);
}

@Override
public Object toConnectRow(final Object ksqlData) {
final Object compatible = replaceSchema(protoCompatibleSchema, ksqlData);
final Object compatible = objectWithSchema(protoCompatibleSchema, ksqlData);
return innerTranslator.toConnectRow(compatible);
}

private static Struct convertStruct(
final Struct source,
final Schema targetSchema
) {
final Struct struct = new Struct(targetSchema);
private static Object objectWithSchema(final Schema schema, final Object object) {
if (object == null || schema.type() != Schema.Type.STRUCT) {
return object;
}

final Struct source = (Struct)object;
final Struct struct = new Struct(schema);

final Iterator<Field> sourceIt = source.schema().fields().iterator();

for (final Field targetField : targetSchema.fields()) {
for (final Field targetField : schema.fields()) {
final Field sourceField = sourceIt.next();
final Object value = source.get(sourceField);
final Object adjusted = replaceSchema(targetField.schema(), value);
struct.put(targetField, adjusted);
}

return struct;
}

@SuppressWarnings("unchecked")
private static Object replaceSchema(final Schema schema, final Object object) {
if (object == null) {
return null;
struct.put(targetField, value);
}
switch (schema.type()) {
case ARRAY:
final List<Object> ksqlArray = new ArrayList<>(((List) object).size());
((List) object).forEach(
e -> ksqlArray.add(replaceSchema(schema.valueSchema(), e)));
return ksqlArray;

case MAP:
final Map<Object, Object> ksqlMap = new HashMap<>();
((Map<Object, Object>) object).forEach(
(key, value) -> ksqlMap.put(
replaceSchema(schema.keySchema(), key),
replaceSchema(schema.valueSchema(), value)
));
return ksqlMap;

case STRUCT:
return convertStruct((Struct) object, schema);
case BYTES:
if (DecimalUtil.isDecimal(schema)) {
return DecimalUtil.ensureFit((BigDecimal) object, schema);
} else {
return object;
}
default:
return object;
}
return struct;
}
}
Loading

0 comments on commit 1c3c8b2

Please sign in to comment.