Skip to content

Commit

Permalink
fix: address upstream change in KafkaAvroDeserializer (revert previou…
Browse files Browse the repository at this point in the history
…s fix) (#3437)
  • Loading branch information
stevenpyzhang authored Sep 27, 2019
1 parent 84c148b commit bed164b
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,6 @@ public Serializer<Object> getSerializer(final SchemaRegistryClient schemaRegistr

@Override
public Deserializer<Object> getDeserializer(final SchemaRegistryClient schemaRegistryClient) {
return new KafkaAvroDeserializer<>(schemaRegistryClient);
return new KafkaAvroDeserializer(schemaRegistryClient);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -247,11 +247,11 @@ public Set<Entry<Object, Object>> entrySet() {
private static final class ValueSpecAvroDeserializer implements Deserializer<Object> {

private final SchemaRegistryClient schemaRegistryClient;
private final KafkaAvroDeserializer<?> avroDeserializer;
private final KafkaAvroDeserializer avroDeserializer;

ValueSpecAvroDeserializer(final SchemaRegistryClient schemaRegistryClient) {
this.schemaRegistryClient = schemaRegistryClient;
this.avroDeserializer = new KafkaAvroDeserializer<>(schemaRegistryClient);
this.avroDeserializer = new KafkaAvroDeserializer(schemaRegistryClient);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public static class RecordFormatter {

private static final Logger log = LoggerFactory.getLogger(RecordFormatter.class);

private final KafkaAvroDeserializer<?> avroDeserializer;
private final KafkaAvroDeserializer avroDeserializer;
private final String topicName;
private final DateFormat dateFormat =
SimpleDateFormat.getDateTimeInstance(3, 1, Locale.getDefault());
Expand All @@ -60,7 +60,7 @@ public static class RecordFormatter {
public RecordFormatter(final SchemaRegistryClient schemaRegistryClient,
final String topicName) {
this.topicName = Objects.requireNonNull(topicName, "topicName");
this.avroDeserializer = new KafkaAvroDeserializer<>(schemaRegistryClient);
this.avroDeserializer = new KafkaAvroDeserializer(schemaRegistryClient);
}

public List<String> format(final ConsumerRecords<String, Bytes> records) {
Expand Down Expand Up @@ -112,7 +112,7 @@ enum Format {
public Optional<Formatter> maybeGetFormatter(
final String topicName,
final ConsumerRecord<String, Bytes> record,
final KafkaAvroDeserializer<?> avroDeserializer,
final KafkaAvroDeserializer avroDeserializer,
final DateFormat dateFormat) {
try {
avroDeserializer.deserialize(topicName, record.value().get());
Expand All @@ -123,7 +123,7 @@ public Optional<Formatter> maybeGetFormatter(
}

private Formatter createFormatter(final String topicName,
final KafkaAvroDeserializer<?> avroDeserializer,
final KafkaAvroDeserializer avroDeserializer,
final DateFormat dateFormat) {
return new Formatter() {
@Override
Expand All @@ -149,7 +149,7 @@ public Format getFormat() {
public Optional<Formatter> maybeGetFormatter(
final String topicName,
final ConsumerRecord<String, Bytes> record,
final KafkaAvroDeserializer<?> avroDeserializer,
final KafkaAvroDeserializer avroDeserializer,
final DateFormat dateFormat) {
try {
final JsonNode jsonNode = JsonMapper.INSTANCE.mapper.readTree(record.value().toString());
Expand Down Expand Up @@ -198,7 +198,7 @@ public Format getFormat() {
public Optional<Formatter> maybeGetFormatter(
final String topicName,
final ConsumerRecord<String, Bytes> record,
final KafkaAvroDeserializer<?> avroDeserializer,
final KafkaAvroDeserializer avroDeserializer,
final DateFormat dateFormat) {
// STRING always returns a formatter because its last in the enum list
return Optional.of(createFormatter(dateFormat));
Expand Down Expand Up @@ -226,7 +226,7 @@ public Format getFormat() {
Optional<Formatter> maybeGetFormatter(
final String topicName,
final ConsumerRecord<String, Bytes> record,
final KafkaAvroDeserializer<?> avroDeserializer,
final KafkaAvroDeserializer avroDeserializer,
final DateFormat dateFormat) {
return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ public void setup() {
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, ""
);

deserializer = new KafkaAvroDeserializer<>(schemaRegistryClient, configs);
deserializer = new KafkaAvroDeserializer(schemaRegistryClient, configs);

orderStruct = new Struct(ORDER_SCHEMA)
.put(ORDERTIME, 1511897796092L)
Expand Down

0 comments on commit bed164b

Please sign in to comment.