From a77316a064e445c4aebc944cc61bb9c41d035829 Mon Sep 17 00:00:00 2001 From: Almog Gavra Date: Wed, 18 Nov 2020 11:02:50 -0800 Subject: [PATCH] feat: add partitions to PRINT TOPIC output --- .../rest/server/resources/streaming/RecordFormatter.java | 3 ++- .../server/resources/streaming/RecordFormatterTest.java | 9 +++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/RecordFormatter.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/RecordFormatter.java index 214eac4ee348..8a5a3ac765fb 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/RecordFormatter.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/RecordFormatter.java @@ -157,7 +157,8 @@ private List formatRecords(final Iterable> private String formatRecord(final ConsumerRecord record) { return "rowtime: " + formatRowTime(record.timestamp()) + ", " + "key: " + keyDeserializers.format(record.key()) - + ", value: " + valueDeserializers.format(record.value()); + + ", value: " + valueDeserializers.format(record.value()) + + ", partition: " + record.partition(); } private static String formatRowTime(final long timestamp) { diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/RecordFormatterTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/RecordFormatterTest.java index eea8b566761b..8809c734b7f3 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/RecordFormatterTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/RecordFormatterTest.java @@ -207,6 +207,15 @@ public void shouldFormatRowTime() { assertThat(formatted, containsString("rowtime: 2020/05/01 14:45:13.111 Z, ")); } + @Test + public void shouldFormatPartition() { + // When: + final String formatted = formatSingle(consumerRecord(null, null)); + + // Then: + assertThat(formatted, containsString("partition: 1")); + } + @Test public void shouldFormatNoRowTime() { // Given: