Skip to content

Commit

Permalink
Add PROTOBUF_NOSR (#9078)
Browse files Browse the repository at this point in the history
This patch adds a new format PROTOBUF_NOSR and fixes a bug in qtt tests where multi-format tests always used JSON.

The new PROTOBUF_NOSR format allows to serde Protobuf messages without using Schema Registry. Instead, it creates the internal Protobuf schema from the logical schema. The limitation of this format is that it does not support dependencies to other schemas.

The new format only works with Protobuf messages serialized into bytes. As a result, it won't work with io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer that has a custom serialization format with a magic byte, SR id, and message indexes before the payload.
  • Loading branch information
Gerrrr authored May 5, 2022
1 parent 4cc65d5 commit 53afd8a
Show file tree
Hide file tree
Showing 342 changed files with 51,243 additions and 495 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,14 @@
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import io.confluent.ksql.serde.SerdeUtils;
import io.confluent.ksql.test.TestFrameworkException;
import io.confluent.ksql.util.DecimalUtil;
import io.confluent.ksql.util.KsqlConstants;
import io.confluent.ksql.util.KsqlException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.data.Time;
import org.apache.kafka.connect.data.Timestamp;
import org.apache.kafka.connect.storage.Converter;

/**
Expand Down Expand Up @@ -113,99 +99,9 @@ public byte[] serialize(final String topic, final Object spec) {
return converter.fromConnectData(
topic,
connectSchema,
specToConnect(spec, connectSchema)
SpecToConnectConverter.specToConnect(spec, connectSchema)
);
}

// CHECKSTYLE_RULES.OFF: CyclomaticComplexity
private Object specToConnect(final Object spec, final Schema schema) {
// CHECKSTYLE_RULES.ON: CyclomaticComplexity
if (spec == null) {
return null;
}

switch (schema.type()) {
case INT32:
final Integer intVal = Integer.valueOf(spec.toString());
if (Time.LOGICAL_NAME.equals(schema.name())) {
return new java.sql.Time(intVal);
}
if (org.apache.kafka.connect.data.Date.LOGICAL_NAME.equals(schema.name())) {
return SerdeUtils.getDateFromEpochDays(intVal);
}
return intVal;
case INT64:
final Long longVal = Long.valueOf(spec.toString());
if (Timestamp.LOGICAL_NAME.equals(schema.name())) {
return new java.sql.Timestamp(longVal);
}
return longVal;
case FLOAT32:
return Float.valueOf(spec.toString());
case FLOAT64:
return Double.valueOf(spec.toString());
case BOOLEAN:
return Boolean.valueOf(spec.toString());
case STRING:
return spec.toString();
case ARRAY:
return ((List<?>) spec)
.stream()
.map(el -> specToConnect(el, schema.valueSchema()))
.collect(Collectors.toList());
case MAP:
return ((Map<?, ?>) spec)
.entrySet()
.stream()
// cannot use Collectors.toMap due to JDK bug:
// https://bugs.openjdk.java.net/browse/JDK-8148463
.collect(
HashMap::new,
((map, v) -> map.put(
specToConnect(v.getKey(), schema.keySchema()),
specToConnect(v.getValue(), schema.valueSchema()))),
HashMap::putAll
);
case STRUCT:
final Map<String, String> caseInsensitiveFieldMap = schema.fields()
.stream()
.collect(Collectors.toMap(
f -> f.name().toUpperCase(),
Field::name
));

final Struct struct = new Struct(schema);
((Map<?, ?>) spec)
.forEach((key, value) -> {
final String realKey = caseInsensitiveFieldMap.get(key.toString().toUpperCase());
if (realKey != null) {
struct.put(realKey, specToConnect(value, schema.field(realKey).schema()));
}
});
return struct;
case BYTES:
if (DecimalUtil.isDecimal(schema)) {
if (spec instanceof BigDecimal) {
return DecimalUtil.ensureFit((BigDecimal) spec, schema);
}

if (spec instanceof String) {
// Supported for legacy reasons...
return DecimalUtil.cast(
(String) spec,
DecimalUtil.precision(schema),
DecimalUtil.scale(schema));
}

throw new TestFrameworkException("DECIMAL type requires JSON number in test data");
} else {
return spec;
}
default:
throw new RuntimeException(
"This test does not support the data type yet: " + schema.type());
}
}
}

private class SpecDeserializer implements Deserializer<Object> {
Expand All @@ -227,78 +123,9 @@ public Object deserialize(final String topic, final byte[] bytes) {
}

final SchemaAndValue schemaAndValue = converter.toConnectData(topic, bytes);
return connectToSpec(schemaAndValue.value(), schemaAndValue.schema(), false);
return SpecToConnectConverter.connectToSpec(schemaAndValue.value(),
schemaAndValue.schema(),
false);
}

// CHECKSTYLE_RULES.OFF: CyclomaticComplexity
private Object connectToSpec(
final Object data,
final Schema schema,
final boolean toUpper
) {
// CHECKSTYLE_RULES.ON: CyclomaticComplexity
if (data == null) {
return null;
}

switch (schema.type()) {
case INT64:
if (Timestamp.LOGICAL_NAME.equals(schema.name())) {
return Timestamp.fromLogical(schema, (Date) data);
}
return data;
case INT32:
if (Time.LOGICAL_NAME.equals(schema.name())) {
return Time.fromLogical(schema, (Date) data);
}
if (org.apache.kafka.connect.data.Date.LOGICAL_NAME.equals(schema.name())) {
return org.apache.kafka.connect.data.Date.fromLogical(schema, (Date) data);
}
return data;
case FLOAT32:
case FLOAT64:
case BOOLEAN:
return data;
case STRING:
return data.toString();
case ARRAY:
return ((List<?>) data)
.stream()
.map(v -> connectToSpec(v, schema.valueSchema(), toUpper))
.collect(Collectors.toList());
case MAP:
final Map<String, Object> map = new HashMap<>();
((Map<?, ?>) data)
.forEach((k, v) -> map.put(
k.toString(),
connectToSpec(v, schema.valueSchema(), toUpper)));
return map;
case STRUCT:
final Map<String, Object> recordSpec = new HashMap<>();
schema.fields()
.forEach(f -> recordSpec.put(
toUpper ? f.name().toUpperCase() : f.name(),
connectToSpec(((Struct) data).get(f.name()), f.schema(), toUpper)));
return recordSpec;
case BYTES:
if (DecimalUtil.isDecimal(schema)) {
if (data instanceof BigDecimal) {
return data;
}
throw new RuntimeException("Unexpected BYTES type " + schema.name());
} else {
if (data instanceof byte[]) {
return ByteBuffer.wrap((byte[]) data);
} else {
return data;
}
}
default:
throw new RuntimeException("Test cannot handle data of type: " + schema.type());
}
}

}


}
Loading

0 comments on commit 53afd8a

Please sign in to comment.