Skip to content

Commit

Permalink
feat: introduce JSON_SR format (#4596)
Browse files Browse the repository at this point in the history
  • Loading branch information
agavra authored Feb 26, 2020
1 parent a475c5f commit daa04d2
Show file tree
Hide file tree
Showing 37 changed files with 2,367 additions and 123 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableList;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.ksql.serde.json.JsonSerdeUtils;
import io.confluent.ksql.test.serde.SerdeSupplier;
import java.math.BigDecimal;
import java.util.Collection;
Expand All @@ -31,13 +32,19 @@
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;

public class ValueSpecJsonSerdeSupplier implements SerdeSupplier<Object> {

private static final ObjectMapper MAPPER = new ObjectMapper()
.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
private final boolean useSchemas;

public ValueSpecJsonSerdeSupplier(final boolean useSchemas) {
this.useSchemas = useSchemas;
}

@Override
public Serializer<Object> getSerializer(final SchemaRegistryClient schemaRegistryClient) {
Expand All @@ -49,7 +56,7 @@ public Deserializer<Object> getDeserializer(final SchemaRegistryClient schemaReg
return new ValueSpecJsonDeserializer();
}

private static final class ValueSpecJsonSerializer implements Serializer<Object> {
private final class ValueSpecJsonSerializer implements Serializer<Object> {
@Override
public void close() {
}
Expand All @@ -65,14 +72,21 @@ public byte[] serialize(final String topicName, final Object spec) {
}
try {
final Object toSerialize = Converter.toJsonNode(spec);
return MAPPER.writeValueAsBytes(toSerialize);
final byte[] bytes = MAPPER.writeValueAsBytes(toSerialize);
if (!useSchemas) {
return bytes;
}

return ArrayUtils.addAll(
new byte[]{/*magic*/ 0x00, /*schemaID*/ 0x00, 0x00, 0x00, 0x01},
bytes);
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
}

private static final class ValueSpecJsonDeserializer implements Deserializer<Object> {
private final class ValueSpecJsonDeserializer implements Deserializer<Object> {
@Override
public void close() {
}
Expand All @@ -87,7 +101,9 @@ public Object deserialize(final String topicName, final byte[] data) {
return null;
}
try {
return MAPPER.readValue(data, Object.class);
return useSchemas
? JsonSerdeUtils.readJsonSR(data, MAPPER, Object.class)
: MAPPER.readValue(data, Object.class);
} catch (final Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.confluent.ksql.serde.avro.AvroFormat;
import io.confluent.ksql.serde.delimited.DelimitedFormat;
import io.confluent.ksql.serde.json.JsonFormat;
import io.confluent.ksql.serde.json.JsonSchemaFormat;
import io.confluent.ksql.serde.kafka.KafkaFormat;
import io.confluent.ksql.serde.protobuf.ProtobufFormat;
import io.confluent.ksql.test.serde.SerdeSupplier;
Expand Down Expand Up @@ -66,7 +67,8 @@ public static SerdeSupplier<?> getSerdeSupplier(
switch (format.name()) {
case AvroFormat.NAME: return new ValueSpecAvroSerdeSupplier();
case ProtobufFormat.NAME: return new ValueSpecProtobufSerdeSupplier();
case JsonFormat.NAME: return new ValueSpecJsonSerdeSupplier();
case JsonFormat.NAME: return new ValueSpecJsonSerdeSupplier(false);
case JsonSchemaFormat.NAME: return new ValueSpecJsonSerdeSupplier(true);
case DelimitedFormat.NAME: return new StringSerdeSupplier();
case KafkaFormat.NAME: return new KafkaSerdeSupplier(schema);
default:
Expand All @@ -89,7 +91,8 @@ public static Optional<ParsedSchema> buildSchema(final JsonNode schema, final St
new AvroFormat()
.toParsedSchema(new AvroData(1).toConnectSchema(avroSchema))
);
} else if (format.equalsIgnoreCase(JsonFormat.NAME)) {
} else if (format.equalsIgnoreCase(JsonFormat.NAME)
|| format.equalsIgnoreCase(JsonSchemaFormat.NAME)) {
final String schemaString = OBJECT_MAPPER.writeValueAsString(schema);
return Optional.of(new JsonSchema(schemaString));
} else if (format.equalsIgnoreCase(ProtobufFormat.NAME)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
{
"version" : "5.5.0",
"timestamp" : 1582741261090,
"plan" : [ {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM INPUT (V0 ARRAY<INTEGER>) WITH (KAFKA_TOPIC='input', VALUE_FORMAT='JSON_SR');",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "INPUT",
"schema" : "`ROWKEY` STRING KEY, `V0` ARRAY<INTEGER>",
"keyField" : null,
"timestampColumn" : null,
"topicName" : "input",
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "JSON_SR",
"properties" : { }
},
"options" : [ ]
},
"windowInfo" : null
},
"queryPlan" : null
}, {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM OUTPUT AS SELECT *\nFROM INPUT INPUT\nEMIT CHANGES",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "OUTPUT",
"schema" : "`ROWKEY` STRING KEY, `V0` ARRAY<INTEGER>",
"keyField" : null,
"timestampColumn" : null,
"topicName" : "OUTPUT",
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "JSON_SR",
"properties" : { }
},
"options" : [ ]
},
"windowInfo" : null
},
"queryPlan" : {
"sources" : [ "INPUT" ],
"sink" : "OUTPUT",
"physicalPlan" : {
"@type" : "streamSinkV1",
"properties" : {
"queryContext" : "OUTPUT"
},
"source" : {
"@type" : "streamSelectV1",
"properties" : {
"queryContext" : "Project"
},
"source" : {
"@type" : "streamSourceV1",
"properties" : {
"queryContext" : "KsqlTopic/Source"
},
"topicName" : "input",
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "JSON_SR",
"properties" : { }
},
"options" : [ ]
},
"timestampColumn" : null,
"sourceSchema" : "`ROWKEY` STRING KEY, `V0` ARRAY<INTEGER>"
},
"selectExpressions" : [ "V0 AS V0" ]
},
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "JSON_SR",
"properties" : { }
},
"options" : [ ]
},
"topicName" : "OUTPUT"
},
"queryId" : "CSAS_OUTPUT_0"
}
} ],
"schemas" : {
"CSAS_OUTPUT_0.KsqlTopic.Source" : "STRUCT<V0 ARRAY<INT>> NOT NULL",
"CSAS_OUTPUT_0.OUTPUT" : "STRUCT<V0 ARRAY<INT>> NOT NULL"
},
"configs" : {
"ksql.extension.dir" : "ext",
"ksql.streams.cache.max.bytes.buffering" : "0",
"ksql.security.extension.class" : null,
"ksql.transient.prefix" : "transient_",
"ksql.persistence.wrap.single.values" : "true",
"ksql.authorization.cache.expiry.time.secs" : "30",
"ksql.schema.registry.url" : "",
"ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler",
"ksql.output.topic.name.prefix" : "",
"ksql.streams.auto.offset.reset" : "earliest",
"ksql.query.pull.enable.standby.reads" : "false",
"ksql.connect.url" : "http://localhost:8083",
"ksql.service.id" : "some.ksql.service.id",
"ksql.internal.topic.min.insync.replicas" : "1",
"ksql.streams.shutdown.timeout.ms" : "300000",
"ksql.new.api.enabled" : "false",
"ksql.streams.state.dir" : "/var/folders/bz/qnz23q_j6v12b3b_tm9ztv700000gn/T/confluent7938322338186257046",
"ksql.internal.topic.replicas" : "1",
"ksql.insert.into.values.enabled" : "true",
"ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807",
"ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler",
"ksql.access.validator.enable" : "auto",
"ksql.streams.bootstrap.servers" : "localhost:0",
"ksql.streams.commit.interval.ms" : "2000",
"ksql.metric.reporters" : "",
"ksql.streams.auto.commit.interval.ms" : "0",
"ksql.metrics.extension" : null,
"ksql.streams.topology.optimization" : "all",
"ksql.query.pull.streamsstore.rebalancing.timeout.ms" : "10000",
"ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.streams.num.stream.threads" : "4",
"ksql.authorization.cache.max.entries" : "10000",
"ksql.metrics.tags.custom" : "",
"ksql.pull.queries.enable" : "true",
"ksql.udfs.enabled" : "true",
"ksql.udf.enable.security.manager" : "true",
"ksql.connect.worker.config" : "",
"ksql.sink.window.change.log.additional.retention" : "1000000",
"ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.udf.collect.metrics" : "false",
"ksql.persistent.prefix" : "query_",
"ksql.query.persistent.active.limit" : "2147483647"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [input])
--> KSTREAM-TRANSFORMVALUES-0000000001
Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: [])
--> Project
<-- KSTREAM-SOURCE-0000000000
Processor: Project (stores: [])
--> KSTREAM-SINK-0000000003
<-- KSTREAM-TRANSFORMVALUES-0000000001
Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT)
<-- Project

Loading

0 comments on commit daa04d2

Please sign in to comment.