Skip to content

Commit

Permalink
refactor: remove Struct from Format interface (#6206)
Browse files Browse the repository at this point in the history
Following on from #6200, this change changes the `Format` interface so that the created `Serde` is `Serde<List<?>>` rather than the previous `Serde<Struct>`, i.e. the format returns serde capable of returning a list of values, rather than requiring those values in a Connect `Struct`.

This is part of the work of moving away from using Connect's `Struct` type internally.

All the 'ConnectFormat' sub-classes still internally using `Serde<Struct>` for now, as that's what the Connect code returns.  This change is not trying to fix this.

`DELIMITED` and `KAFKA` formats no longer know about any `Connect` types, i.e. no `Struct` and no `Schema`. Yay.

In the engine the key is still passed around as a `Struct`. This change is not trying to fix this.  So there is code to convert the returned `List<?>` into the key `Struct`.  However, this will go once the key is no longer a `Struct`, and its only a single, primitive field, so the overhead is very low, and the `KAFKA` format was doing this already internally anyway, (so no change in perf).

Of course, any one value in the returned `List<?>` can still be a Connect `Struct` if the column type is a struct. This change is not trying to fix this.  This should be fixed later.

Serde performance for JSON and AVRO remains the same, (no benchmark available for other formats yet).

Previous benchmark results (ran locally):

```
Benchmark                   (schemaName)  (serializationFormat)  Mode  Cnt  Score   Error  Units
SerdeBenchmark.deserialize   impressions                   JSON  avgt    3  1.148 ± 0.205  us/op
SerdeBenchmark.deserialize   impressions                   Avro  avgt    3  2.056 ± 1.141  us/op
SerdeBenchmark.deserialize       metrics                   JSON  avgt    3  5.504 ± 1.535  us/op
SerdeBenchmark.deserialize       metrics                   Avro  avgt    3  7.564 ± 3.445  us/op
SerdeBenchmark.serialize     impressions                   JSON  avgt    3  0.556 ± 0.085  us/op
SerdeBenchmark.serialize     impressions                   Avro  avgt    3  1.420 ± 3.746  us/op
SerdeBenchmark.serialize         metrics                   JSON  avgt    3  2.909 ± 1.162  us/op
SerdeBenchmark.serialize         metrics                   Avro  avgt    3  5.076 ± 0.383  us/op
```

Latest benchmark results (no statistically significant change):

```
Benchmark                   (schemaName)  (serializationFormat)  Mode  Cnt  Score   Error  Units
SerdeBenchmark.deserialize   impressions                   JSON  avgt    3  1.191 ± 0.520  us/op
SerdeBenchmark.deserialize   impressions                   Avro  avgt    3  2.032 ± 0.512  us/op
SerdeBenchmark.deserialize       metrics                   JSON  avgt    3  5.324 ± 1.457  us/op
SerdeBenchmark.deserialize       metrics                   Avro  avgt    3  7.604 ± 2.389  us/op
SerdeBenchmark.serialize     impressions                   JSON  avgt    3  0.589 ± 0.027  us/op
SerdeBenchmark.serialize     impressions                   Avro  avgt    3  1.307 ± 0.147  us/op
SerdeBenchmark.serialize         metrics                   JSON  avgt    3  2.752 ± 0.583  us/op
SerdeBenchmark.serialize         metrics                   Avro  avgt    3  5.005 ± 0.780  us/op
```

Co-authored-by: Andy Coates <[email protected]>
  • Loading branch information
big-andy-coates and big-andy-coates authored Sep 21, 2020
1 parent 07464df commit ec91d2b
Show file tree
Hide file tree
Showing 29 changed files with 801 additions and 1,042 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.hamcrest.Matchers.is;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Multimap;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
Expand All @@ -33,6 +34,8 @@
import io.confluent.ksql.logging.processing.ProcessingLogContext;
import io.confluent.ksql.schema.ksql.PersistenceSchema;
import io.confluent.ksql.schema.ksql.PhysicalSchema;
import io.confluent.ksql.schema.ksql.SchemaConverters;
import io.confluent.ksql.schema.ksql.SimpleColumn;
import io.confluent.ksql.serde.Format;
import io.confluent.ksql.serde.FormatInfo;
import io.confluent.ksql.serde.GenericRowSerDe;
Expand Down Expand Up @@ -63,6 +66,7 @@
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.test.TestUtils;
Expand Down Expand Up @@ -636,47 +640,53 @@ protected void after() {
}

@SuppressWarnings({"unchecked", "rawtypes"})
private static <K> Serde<K> getKeySerde(final PhysicalSchema schema) {
final PersistenceSchema keySchema = schema.keySchema();

final SimpleColumn onlyColumn = Iterables.getOnlyElement(keySchema.columns());

final Class<?> javaType = SchemaConverters.sqlToJavaConverter()
.toJavaType(onlyColumn.type());

return (Serde) KafkaSerdeFactory
.getPrimitiveSerde(onlyColumn.type().baseType(), javaType);
}

private static <K> Serializer<K> getKeySerializer(final PhysicalSchema schema) {
return (Serializer) KafkaSerdeFactory
.getPrimitiveSerde(schema.keySchema())
.serializer();
return IntegrationTestHarness.<K>getKeySerde(schema).serializer();
}

private Serializer<GenericRow> getValueSerializer(
private static <K> Deserializer<K> getKeyDeserializer(final PhysicalSchema schema) {
return IntegrationTestHarness.<K>getKeySerde(schema).deserializer();
}

private Serde<GenericRow> getValueSerde(
final Format format,
final PhysicalSchema schema
final PhysicalSchema schema,
final String loggerNamePrefix
) {
return GenericRowSerDe.from(
FormatInfo.of(format.name()),
schema.valueSchema(),
new KsqlConfig(Collections.emptyMap()),
serviceContext.get().getSchemaRegistryClientFactory(),
"producer",
loggerNamePrefix,
ProcessingLogContext.create()
).serializer();
);
}

@SuppressWarnings({"unchecked", "rawtypes"})
private static <K> Deserializer<K> getKeyDeserializer(
private Serializer<GenericRow> getValueSerializer(
final Format format,
final PhysicalSchema schema
) {
return (Deserializer) KafkaSerdeFactory
.getPrimitiveSerde(schema.keySchema())
.deserializer();
return getValueSerde(format, schema, "producer").serializer();
}

private Deserializer<GenericRow> getValueDeserializer(
final Format format,
final PhysicalSchema schema
) {
return GenericRowSerDe.from(
FormatInfo.of(format.name()),
schema.valueSchema(),
new KsqlConfig(Collections.emptyMap()),
serviceContext.get().getSchemaRegistryClientFactory(),
"consumer",
ProcessingLogContext.create()
).deserializer();
return getValueSerde(format, schema, "consumer").deserializer();
}

public void ensureSchema(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import io.confluent.ksql.query.id.SequentialQueryIdGenerator;
import io.confluent.ksql.schema.ksql.PersistenceSchema;
import io.confluent.ksql.schema.utils.FormatOptions;
import io.confluent.ksql.serde.GenericKeySerDe;
import io.confluent.ksql.serde.GenericRowSerDe;
import io.confluent.ksql.services.FakeKafkaTopicClient;
import io.confluent.ksql.services.ServiceContext;
Expand Down Expand Up @@ -338,14 +339,14 @@ private Serde<Struct> keySerde(final DataSource sinkSource) {
sinkSource.getSerdeOptions().keyFeatures()
);

return sinkSource.getKsqlTopic().getKeyFormat()
.getFormat()
.getSerde(
schema,
sinkSource.getKsqlTopic().getKeyFormat().getFormatInfo().getProperties(),
config,
serviceContext.getSchemaRegistryClientFactory()
);
return new GenericKeySerDe().create(
sinkSource.getKsqlTopic().getKeyFormat().getFormatInfo(),
schema,
config,
serviceContext.getSchemaRegistryClientFactory(),
"",
NoopProcessingLogContext.INSTANCE
);
}

private Serde<GenericRow> valueSerde(final DataSource sinkSource) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.confluent.ksql.serde.GenericRowSerDe;
import io.confluent.ksql.serde.KeySerdeFactory;
import io.confluent.ksql.serde.ValueSerdeFactory;
import io.confluent.ksql.serde.connect.ConnectSchemas;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.VertxUtils;
Expand All @@ -44,6 +45,7 @@
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.connect.data.ConnectSchema;
import org.apache.kafka.connect.data.Struct;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
Expand All @@ -59,6 +61,7 @@ public final class InsertsSubscriber extends BaseSubscriber<JsonObject> implemen

private final Producer<byte[], byte[]> producer;
private final DataSource dataSource;
private final ConnectSchema keySchema;
private final Serializer<Struct> keySerializer;
private final Serializer<GenericRow> valueSerializer;
private final BufferedPublisher<InsertResult> acksPublisher;
Expand All @@ -67,10 +70,15 @@ public final class InsertsSubscriber extends BaseSubscriber<JsonObject> implemen
private boolean drainHandlerSet;
private long sequence;

public static InsertsSubscriber createInsertsSubscriber(final ServiceContext serviceContext,
final JsonObject properties, final DataSource dataSource, final KsqlConfig ksqlConfig,
final Context context, final Subscriber<InsertResult> acksSubscriber,
final WorkerExecutor workerExecutor) {
public static InsertsSubscriber createInsertsSubscriber(
final ServiceContext serviceContext,
final JsonObject properties,
final DataSource dataSource,
final KsqlConfig ksqlConfig,
final Context context,
final Subscriber<InsertResult> acksSubscriber,
final WorkerExecutor workerExecutor
) {
final KsqlConfig configCopy = ksqlConfig.cloneWithPropertyOverwrite(properties.getMap());
final Producer<byte[], byte[]> producer = serviceContext
.getKafkaClientSupplier()
Expand Down Expand Up @@ -107,15 +115,19 @@ public static InsertsSubscriber createInsertsSubscriber(final ServiceContext ser
valueSerde.serializer(), acksPublisher, workerExecutor);
}

private InsertsSubscriber(final Context context,
final Producer<byte[], byte[]> producer, final DataSource dataSource,
private InsertsSubscriber(
final Context context,
final Producer<byte[], byte[]> producer,
final DataSource dataSource,
final Serializer<Struct> keySerializer,
final Serializer<GenericRow> valueSerializer,
final BufferedPublisher<InsertResult> acksPublisher,
final WorkerExecutor workerExecutor) {
final WorkerExecutor workerExecutor
) {
super(context);
this.producer = Objects.requireNonNull(producer);
this.dataSource = Objects.requireNonNull(dataSource);
this.keySchema = ConnectSchemas.columnsToConnectSchema(dataSource.getSchema().key());
this.keySerializer = Objects.requireNonNull(keySerializer);
this.valueSerializer = Objects.requireNonNull(valueSerializer);
this.acksPublisher = Objects.requireNonNull(acksPublisher);
Expand Down Expand Up @@ -190,7 +202,7 @@ private void checkRequest() {
}

private Struct extractKey(final JsonObject values) {
return KeyValueExtractor.extractKey(values, dataSource.getSchema(), SQL_VALUE_COERCER);
return KeyValueExtractor.extractKey(values, keySchema, SQL_VALUE_COERCER);
}

private GenericRow extractValues(final JsonObject values) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import io.confluent.ksql.schema.ksql.SchemaConverters;
import io.confluent.ksql.schema.ksql.SqlValueCoercer;
import io.confluent.ksql.schema.ksql.types.SqlType;
import io.confluent.ksql.serde.connect.ConnectSchemas;
import io.confluent.ksql.util.ParserUtil;
import io.vertx.core.json.JsonObject;
import java.util.ArrayList;
Expand All @@ -39,11 +38,10 @@ private KeyValueExtractor() {

public static Struct extractKey(
final JsonObject values,
final LogicalSchema logicalSchema,
final ConnectSchema connectSchema,
final SqlValueCoercer sqlValueCoercer
) {
final ConnectSchema keySchema = ConnectSchemas.columnsToConnectSchema(logicalSchema.key());
final Struct key = new Struct(keySchema);
final Struct key = new Struct(connectSchema);
for (final Field field : key.schema().fields()) {
final Object value = values.getValue(field.name());
if (value == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.util.function.Supplier;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.connect.data.Struct;

/**
* A {@code Format} is a serialization specification of a Kafka topic
Expand Down Expand Up @@ -174,7 +173,7 @@ default Set<String> getInheritableProperties() {
* @param srClientFactory supplier of the SR client
* @return a serde pair capable of (de)serializing the data in this format.
*/
Serde<Struct> getSerde(
Serde<List<?>> getSerde(
PersistenceSchema schema,
Map<String, String> formatProperties,
KsqlConfig ksqlConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,27 @@

package io.confluent.ksql.serde;

import static java.util.Objects.requireNonNull;

import com.google.common.annotations.VisibleForTesting;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.ksql.logging.processing.ProcessingLogContext;
import io.confluent.ksql.schema.ksql.PersistenceSchema;
import io.confluent.ksql.serde.connect.ConnectSchemas;
import io.confluent.ksql.util.KsqlConfig;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Supplier;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.connect.data.ConnectSchema;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.WindowedSerdes.SessionWindowedSerde;
Expand Down Expand Up @@ -94,14 +106,111 @@ private Serde<Struct> createInner(
final String loggerNamePrefix,
final ProcessingLogContext processingLogContext
) {
final Serde<Struct> formatSerde = innerFactory
final Serde<List<?>> formatSerde = innerFactory
.createFormatSerde("Key", format, schema, ksqlConfig, schemaRegistryClientFactory);

final Serde<Struct> structSerde = toStructSerde(formatSerde, schema);

final Serde<Struct> serde = innerFactory
.wrapInLoggingSerde(formatSerde, loggerNamePrefix, processingLogContext);
.wrapInLoggingSerde(structSerde, loggerNamePrefix, processingLogContext);

serde.configure(Collections.emptyMap(), true);

return serde;
}

private static Serde<Struct> toStructSerde(
final Serde<List<?>> inner,
final PersistenceSchema schema
) {
final ConnectSchema connectSchema = ConnectSchemas.columnsToConnectSchema(schema.columns());
return Serdes.serdeFrom(
new GenericKeySerializer(inner.serializer(), connectSchema.fields().size()),
new GenericKeyDeserializer(inner.deserializer(), connectSchema)
);
}

@VisibleForTesting
static class GenericKeySerializer implements Serializer<Struct> {

private final Serializer<List<?>> inner;
private final int numColumns;

GenericKeySerializer(final Serializer<List<?>> inner, final int numColumns) {
this.inner = requireNonNull(inner, "inner");
this.numColumns = numColumns;
}

@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {
inner.configure(configs, isKey);
}

@Override
public byte[] serialize(final String topic, final Struct data) {
if (data == null) {
return inner.serialize(topic, null);
}

final List<Field> fields = data.schema().fields();

SerdeUtils.throwOnColumnCountMismatch(numColumns, fields.size(), true, topic);

final ArrayList<Object> values = new ArrayList<>(numColumns);
for (final Field field : fields) {
values.add(data.get(field));
}

return inner.serialize(topic, values);
}

@Override
public void close() {
inner.close();
}
}

@VisibleForTesting
static class GenericKeyDeserializer implements Deserializer<Struct> {

private final Deserializer<List<?>> inner;
private final ConnectSchema connectSchema;

GenericKeyDeserializer(final Deserializer<List<?>> inner, final ConnectSchema connectSchema) {
this.inner = requireNonNull(inner, "inner");
this.connectSchema = requireNonNull(connectSchema, "connectSchema");
}

@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {
inner.configure(configs, isKey);
}

@Override
public void close() {
inner.close();
}

@Override
public Struct deserialize(final String topic, final byte[] data) {
final List<?> values = inner.deserialize(topic, data);
if (values == null) {
return null;
}

final List<Field> fields = connectSchema.fields();

SerdeUtils.throwOnColumnCountMismatch(fields.size(), values.size(), false, topic);

final Struct row = new Struct(connectSchema);

final Iterator<Field> fIt = fields.iterator();
final Iterator<?> vIt = values.iterator();
while (fIt.hasNext()) {
row.put(fIt.next(), vIt.next());
}

return row;
}
}
}
Loading

0 comments on commit ec91d2b

Please sign in to comment.