From bed164bec9c003c94e98ce033bf4f9bc8ebebf46 Mon Sep 17 00:00:00 2001 From: Steven Zhang <35498506+stevenpyzhang@users.noreply.github.com> Date: Fri, 27 Sep 2019 13:59:59 -0700 Subject: [PATCH] fix: address upstream change in KafkaAvroDeserializer (revert previous fix) (#3437) --- .../ksql/test/serde/avro/AvroSerdeSupplier.java | 2 +- .../serde/avro/ValueSpecAvroSerdeSupplier.java | 4 ++-- .../server/resources/streaming/TopicStream.java | 14 +++++++------- .../ksql/serde/avro/KsqlAvroSerializerTest.java | 2 +- 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/ksql-functional-tests/src/main/java/io/confluent/ksql/test/serde/avro/AvroSerdeSupplier.java b/ksql-functional-tests/src/main/java/io/confluent/ksql/test/serde/avro/AvroSerdeSupplier.java index 9aaf27c74bfc..7f395a248ee3 100644 --- a/ksql-functional-tests/src/main/java/io/confluent/ksql/test/serde/avro/AvroSerdeSupplier.java +++ b/ksql-functional-tests/src/main/java/io/confluent/ksql/test/serde/avro/AvroSerdeSupplier.java @@ -30,6 +30,6 @@ public Serializer getSerializer(final SchemaRegistryClient schemaRegistr @Override public Deserializer getDeserializer(final SchemaRegistryClient schemaRegistryClient) { - return new KafkaAvroDeserializer<>(schemaRegistryClient); + return new KafkaAvroDeserializer(schemaRegistryClient); } } \ No newline at end of file diff --git a/ksql-functional-tests/src/main/java/io/confluent/ksql/test/serde/avro/ValueSpecAvroSerdeSupplier.java b/ksql-functional-tests/src/main/java/io/confluent/ksql/test/serde/avro/ValueSpecAvroSerdeSupplier.java index dc4c20d0d284..abf9dee81e03 100644 --- a/ksql-functional-tests/src/main/java/io/confluent/ksql/test/serde/avro/ValueSpecAvroSerdeSupplier.java +++ b/ksql-functional-tests/src/main/java/io/confluent/ksql/test/serde/avro/ValueSpecAvroSerdeSupplier.java @@ -247,11 +247,11 @@ public Set> entrySet() { private static final class ValueSpecAvroDeserializer implements Deserializer { private final SchemaRegistryClient schemaRegistryClient; - private final KafkaAvroDeserializer avroDeserializer; + private final KafkaAvroDeserializer avroDeserializer; ValueSpecAvroDeserializer(final SchemaRegistryClient schemaRegistryClient) { this.schemaRegistryClient = schemaRegistryClient; - this.avroDeserializer = new KafkaAvroDeserializer<>(schemaRegistryClient); + this.avroDeserializer = new KafkaAvroDeserializer(schemaRegistryClient); } @Override diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/TopicStream.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/TopicStream.java index 82ea8ed7f6db..c24b1e132b28 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/TopicStream.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/TopicStream.java @@ -50,7 +50,7 @@ public static class RecordFormatter { private static final Logger log = LoggerFactory.getLogger(RecordFormatter.class); - private final KafkaAvroDeserializer avroDeserializer; + private final KafkaAvroDeserializer avroDeserializer; private final String topicName; private final DateFormat dateFormat = SimpleDateFormat.getDateTimeInstance(3, 1, Locale.getDefault()); @@ -60,7 +60,7 @@ public static class RecordFormatter { public RecordFormatter(final SchemaRegistryClient schemaRegistryClient, final String topicName) { this.topicName = Objects.requireNonNull(topicName, "topicName"); - this.avroDeserializer = new KafkaAvroDeserializer<>(schemaRegistryClient); + this.avroDeserializer = new KafkaAvroDeserializer(schemaRegistryClient); } public List format(final ConsumerRecords records) { @@ -112,7 +112,7 @@ enum Format { public Optional maybeGetFormatter( final String topicName, final ConsumerRecord record, - final KafkaAvroDeserializer avroDeserializer, + final KafkaAvroDeserializer avroDeserializer, final DateFormat dateFormat) { try { avroDeserializer.deserialize(topicName, record.value().get()); @@ -123,7 +123,7 @@ public Optional maybeGetFormatter( } private Formatter createFormatter(final String topicName, - final KafkaAvroDeserializer avroDeserializer, + final KafkaAvroDeserializer avroDeserializer, final DateFormat dateFormat) { return new Formatter() { @Override @@ -149,7 +149,7 @@ public Format getFormat() { public Optional maybeGetFormatter( final String topicName, final ConsumerRecord record, - final KafkaAvroDeserializer avroDeserializer, + final KafkaAvroDeserializer avroDeserializer, final DateFormat dateFormat) { try { final JsonNode jsonNode = JsonMapper.INSTANCE.mapper.readTree(record.value().toString()); @@ -198,7 +198,7 @@ public Format getFormat() { public Optional maybeGetFormatter( final String topicName, final ConsumerRecord record, - final KafkaAvroDeserializer avroDeserializer, + final KafkaAvroDeserializer avroDeserializer, final DateFormat dateFormat) { // STRING always returns a formatter because its last in the enum list return Optional.of(createFormatter(dateFormat)); @@ -226,7 +226,7 @@ public Format getFormat() { Optional maybeGetFormatter( final String topicName, final ConsumerRecord record, - final KafkaAvroDeserializer avroDeserializer, + final KafkaAvroDeserializer avroDeserializer, final DateFormat dateFormat) { return Optional.empty(); } diff --git a/ksql-serde/src/test/java/io/confluent/ksql/serde/avro/KsqlAvroSerializerTest.java b/ksql-serde/src/test/java/io/confluent/ksql/serde/avro/KsqlAvroSerializerTest.java index 9e5cbdb722b9..68ed8450994c 100644 --- a/ksql-serde/src/test/java/io/confluent/ksql/serde/avro/KsqlAvroSerializerTest.java +++ b/ksql-serde/src/test/java/io/confluent/ksql/serde/avro/KsqlAvroSerializerTest.java @@ -169,7 +169,7 @@ public void setup() { AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "" ); - deserializer = new KafkaAvroDeserializer<>(schemaRegistryClient, configs); + deserializer = new KafkaAvroDeserializer(schemaRegistryClient, configs); orderStruct = new Struct(ORDER_SCHEMA) .put(ORDERTIME, 1511897796092L)