diff --git a/src/main/java/org/akhq/models/Record.java b/src/main/java/org/akhq/models/Record.java index 4e8739fc1..9e83e4f3f 100644 --- a/src/main/java/org/akhq/models/Record.java +++ b/src/main/java/org/akhq/models/Record.java @@ -8,7 +8,6 @@ import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.schemaregistry.json.JsonSchema; import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema; -import io.micronaut.context.annotation.Value; import kafka.coordinator.group.GroupMetadataManager; import kafka.coordinator.transaction.TransactionLog; import kafka.coordinator.transaction.TxnKey; @@ -16,6 +15,7 @@ import org.akhq.configs.SchemaRegistryType; import org.akhq.utils.AvroToJsonDeserializer; import org.akhq.utils.AvroToJsonSerializer; +import org.akhq.utils.ContentUtils; import org.akhq.utils.ProtobufToJsonDeserializer; import org.apache.avro.generic.GenericRecord; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -115,7 +115,7 @@ public Record(SchemaRegistryClient client, ConsumerRecord record this.bytesValue = bytesValue; this.valueSchemaId = getAvroSchemaId(this.bytesValue); for (Header header: record.headers()) { - String headerValue = header.value() != null ? new String(header.value()) : null; + String headerValue = String.valueOf(ContentUtils.convertToObject(header.value())); this.headers.add(new KeyValue<>(header.key(), headerValue)); } diff --git a/src/main/java/org/akhq/utils/ContentUtils.java b/src/main/java/org/akhq/utils/ContentUtils.java new file mode 100644 index 000000000..f8b685695 --- /dev/null +++ b/src/main/java/org/akhq/utils/ContentUtils.java @@ -0,0 +1,93 @@ +package org.akhq.utils; + +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; +import java.util.regex.Pattern; + +public class ContentUtils { + + /** + * Detects if bytes contain a UTF-8 string or something else + * Source: https://stackoverflow.com/questions/1193200/how-can-i-check-whether-a-byte-array-contains-a-unicode-string-in-java + * @param value the bytes to test for a UTF-8 encoded {@code java.lang.String} value + * @return true, if the byte[] contains a UTF-8 encode {@code java.lang.String}, false if it hold something else (e.g. a {@code int) + * @throws UnsupportedEncodingException + */ + private static boolean isValidUTF8(byte[] value) throws UnsupportedEncodingException + { + Pattern p = Pattern.compile("\\A(\n" + + " [\\x09\\x0A\\x0D\\x20-\\x7E] # ASCII\\n" + + "| [\\xC2-\\xDF][\\x80-\\xBF] # non-overlong 2-byte\n" + + "| \\xE0[\\xA0-\\xBF][\\x80-\\xBF] # excluding overlongs\n" + + "| [\\xE1-\\xEC\\xEE\\xEF][\\x80-\\xBF]{2} # straight 3-byte\n" + + "| \\xED[\\x80-\\x9F][\\x80-\\xBF] # excluding surrogates\n" + + "| \\xF0[\\x90-\\xBF][\\x80-\\xBF]{2} # planes 1-3\n" + + "| [\\xF1-\\xF3][\\x80-\\xBF]{3} # planes 4-15\n" + + "| \\xF4[\\x80-\\x8F][\\x80-\\xBF]{2} # plane 16\n" + + ")*\\z", Pattern.COMMENTS); + + String phonyString = new String(value, "ISO-8859-1"); + return p.matcher(phonyString).matches(); + } + + /** + * Converts bytes to long. + * + * @param value the bytes to convert in to a long + * @return the long build from the given bytes + */ + private static Long asLong(byte[] value) { + return value != null ? ByteBuffer.wrap(value).getLong() : null; + } + + /** + * Converts the given bytes to {@code int}. + * + * @param value the bytes to convert into a {@code int} + * @return the {@code int} build from the given bytes + */ + private static Integer asInt(byte[] value) { + return value != null ? ByteBuffer.wrap(value).getInt() : null; + } + + /** + * Converts the given bytes to {@code short}. + * + * @param value the bytes to convert into a {@code short} + * @return the {@code short} build from the given bytes + */ + private static Short asShort(byte[] value) { + return value != null ? ByteBuffer.wrap(value).getShort() : null; + } + + /** + * Converts the given bytes either into a {@code java.lang.string}, {@code int}, {@code long} or {@code short} depending on the content it contains. + * @param value the bytes to convert + * @return the value as an {@code java.lang.string}, {@code int}, {@code long} or {@code short} + */ + public static Object convertToObject(byte[] value) { + Object valueAsObject = null; + + if (value != null) { + try { + if (ContentUtils.isValidUTF8(value)) { + valueAsObject = new String(value); + } else { + try { + valueAsObject = ContentUtils.asLong(value); + } catch (Exception e) { + try { + valueAsObject = ContentUtils.asInt(value); + } catch (Exception ex) { + valueAsObject = ContentUtils.asShort(value); + } + } + } + } catch(UnsupportedEncodingException ex) { + valueAsObject = "[encoding error]"; + } + } + return valueAsObject; + } + +} diff --git a/src/test/java/org/akhq/utils/ContentUtilsTest.java b/src/test/java/org/akhq/utils/ContentUtilsTest.java new file mode 100644 index 000000000..a028a73bd --- /dev/null +++ b/src/test/java/org/akhq/utils/ContentUtilsTest.java @@ -0,0 +1,73 @@ +package org.akhq.utils; + +import org.akhq.models.Record; +import org.apache.kafka.common.header.Header; +import org.junit.jupiter.api.Test; + +import javax.swing.text.AbstractDocument; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class ContentUtilsTest { + + private static byte[] toBytes(Short value) { + ByteBuffer buffer = ByteBuffer.allocate(Short.BYTES); + buffer.putShort(value); + return buffer.array(); + } + + private static byte[] toBytes(Integer value) { + ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES); + buffer.putInt(value); + return buffer.array(); + } + + private static byte[] toBytes(Long value) { + ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES); + buffer.putLong(value); + return buffer.array(); + } + + private static byte[] toBytes(Float value) { + ByteBuffer buffer = ByteBuffer.allocate(Float.BYTES); + buffer.putFloat(value); + return buffer.array(); + } + + private static byte[] toBytes(Double value) { + ByteBuffer buffer = ByteBuffer.allocate(Double.BYTES); + buffer.putDouble(value); + return buffer.array(); + } + + @Test + void testHeaderValueStringUTF8() { + String testValue = "Test"; + + assertEquals(testValue, ContentUtils.convertToObject(testValue.getBytes(StandardCharsets.UTF_8))); + } + + @Test + void testHeaderValueInteger() { + int testValue = 1; + + assertEquals(testValue, ContentUtils.convertToObject(toBytes(testValue))); + } + + @Test + void testHeaderValueLong() { + long testValue = 111l; + + assertEquals(testValue, ContentUtils.convertToObject(toBytes(testValue))); + } + + @Test + void testHeaderValueShort() { + short testValue = 10; + + assertEquals(testValue, ContentUtils.convertToObject(toBytes(testValue))); + } + +}