diff --git a/ksqldb-cli/src/main/java/io/confluent/ksql/cli/console/Console.java b/ksqldb-cli/src/main/java/io/confluent/ksql/cli/console/Console.java index ccc1b344a3d0..e6e53587eb77 100644 --- a/ksqldb-cli/src/main/java/io/confluent/ksql/cli/console/Console.java +++ b/ksqldb-cli/src/main/java/io/confluent/ksql/cli/console/Console.java @@ -50,7 +50,7 @@ import io.confluent.ksql.rest.entity.CommandStatusEntity; import io.confluent.ksql.rest.entity.ConnectorDescription; import io.confluent.ksql.rest.entity.ConnectorList; -import io.confluent.ksql.rest.entity.ConsumerGroupOffsets; +import io.confluent.ksql.rest.entity.SourceConsumerOffsets; import io.confluent.ksql.rest.entity.CreateConnectorEntity; import io.confluent.ksql.rest.entity.DropConnectorEntity; import io.confluent.ksql.rest.entity.ErrorEntity; @@ -619,16 +619,16 @@ private void printSourceDescription(final SourceDescription source) { "Statistics of the local KSQL server interaction with the Kafka topic " + source.getTopic() )); - writer().println(); - Optional consumerGroupOffsetsOptional = source.getConsumerGroupOffsets(); + Optional consumerGroupOffsetsOptional = source.getConsumerGroupOffsets(); if (consumerGroupOffsetsOptional.isPresent()) { - ConsumerGroupOffsets consumerGroupOffsets = consumerGroupOffsetsOptional.get(); - writer().println(String.format("%-20s : %s", "Consumer Group", consumerGroupOffsets.getGroupId())); - writer().println(String.format("%-20s : %s", "Kafka topic", consumerGroupOffsets.getKafkaTopic())); + writer().println(); + SourceConsumerOffsets sourceConsumerOffsets = consumerGroupOffsetsOptional.get(); + writer().println(String.format("%-20s : %s", "Consumer Group", sourceConsumerOffsets.getGroupId())); + writer().println(String.format("%-20s : %s", "Kafka topic", sourceConsumerOffsets.getKafkaTopic())); writer().println(""); final Table taskTable = new Table.Builder() .withColumnHeaders(ImmutableList.of("Partition", "Start Offset", "End Offset", "Offset", "Lag")) - .withRows(consumerGroupOffsets.getOffsets() + .withRows(sourceConsumerOffsets.getOffsets() .stream() .map(offset -> ImmutableList.of( String.valueOf(offset.getPartition()), diff --git a/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.java b/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.java index adb11cbedcc3..06cf65759726 100644 --- a/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.java +++ b/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.java @@ -67,6 +67,8 @@ import io.confluent.ksql.rest.entity.RunningQuery; import io.confluent.ksql.rest.entity.SchemaInfo; import io.confluent.ksql.rest.entity.SimpleConnectorInfo; +import io.confluent.ksql.rest.entity.SourceConsumerOffset; +import io.confluent.ksql.rest.entity.SourceConsumerOffsets; import io.confluent.ksql.rest.entity.SourceDescription; import io.confluent.ksql.rest.entity.SourceDescriptionEntity; import io.confluent.ksql.rest.entity.SourceInfo; @@ -631,7 +633,8 @@ public void testPrintSourceDescription() { + " \"topic\" : \"kadka-topic\"," + NEWLINE + " \"partitions\" : 1," + NEWLINE + " \"replication\" : 1," + NEWLINE - + " \"statement\" : \"sql statement\"" + NEWLINE + + " \"statement\" : \"sql statement\"," + NEWLINE + + " \"consumerGroupOffsets\" : null" + NEWLINE + " }," + NEWLINE + " \"warnings\" : [ ]" + NEWLINE + "} ]" + NEWLINE)); @@ -769,7 +772,8 @@ public void testPrintConnectorDescription() { + " \"topic\" : \"kadka-topic\"," + NEWLINE + " \"partitions\" : 2," + NEWLINE + " \"replication\" : 1," + NEWLINE - + " \"statement\" : \"statement\"" + NEWLINE + + " \"statement\" : \"statement\"," + NEWLINE + + " \"consumerGroupOffsets\" : null" + NEWLINE + " } ]," + NEWLINE + " \"topics\" : [ \"a-jdbc-topic\" ]," + NEWLINE + " \"warnings\" : [ ]" + NEWLINE @@ -1120,6 +1124,174 @@ public void testPrintExecuptionPlan() { } } + @Test + public void shouldPrintTopicDescribeExtendedWithConsumerOffsets() { + // Given: + final List readQueries = ImmutableList.of( + new RunningQuery("read query", ImmutableSet.of("sink1"), ImmutableSet.of("sink1 topic"), new QueryId("readId"), queryStatusCount, KsqlConstants.KsqlQueryType.PERSISTENT) + ); + final List writeQueries = ImmutableList.of( + new RunningQuery("write query", ImmutableSet.of("sink2"), ImmutableSet.of("sink2 topic"), new QueryId("writeId"), queryStatusCount, KsqlConstants.KsqlQueryType.PERSISTENT) + ); + + final KsqlEntityList entityList = new KsqlEntityList(ImmutableList.of( + new SourceDescriptionEntity( + "e", + new SourceDescription( + "TestSource", + Optional.empty(), + readQueries, + writeQueries, + buildTestSchema(SqlTypes.STRING), + DataSourceType.KTABLE.getKsqlType(), + "2000-01-01", + "stats", + "errors", + true, + "kafka", + "avro", + "kadka-topic", + 2, 1, + "sql statement text", + Optional.of( + new SourceConsumerOffsets( + "consumer1", + "kadka-topic", + ImmutableList.of( + new SourceConsumerOffset(0, 100, 900, 800), + new SourceConsumerOffset(1, 50, 900, 900) + )) + )), + Collections.emptyList() + )) + ); + + // When: + console.printKsqlEntityList(entityList); + + // Then: + final String output = terminal.getOutputString(); + if (console.getOutputFormat() == OutputFormat.JSON) { + assertThat(output, is("[ {" + NEWLINE + + " \"@type\" : \"sourceDescription\"," + NEWLINE + + " \"statementText\" : \"e\"," + NEWLINE + + " \"sourceDescription\" : {" + NEWLINE + + " \"name\" : \"TestSource\"," + NEWLINE + + " \"windowType\" : null," + NEWLINE + + " \"readQueries\" : [ {" + NEWLINE + + " \"queryString\" : \"read query\"," + NEWLINE + + " \"sinks\" : [ \"sink1\" ]," + NEWLINE + + " \"sinkKafkaTopics\" : [ \"sink1 topic\" ]," + NEWLINE + + " \"id\" : \"readId\"," + NEWLINE + + " \"statusCount\" : {" + NEWLINE + + " \"RUNNING\" : 1," + NEWLINE + + " \"ERROR\" : 2" + NEWLINE + + " }," + NEWLINE + + " \"queryType\" : \"PERSISTENT\"," + NEWLINE + + " \"state\" : \"" + AGGREGATE_STATUS +"\"" + NEWLINE + + " } ]," + NEWLINE + + " \"writeQueries\" : [ {" + NEWLINE + + " \"queryString\" : \"write query\"," + NEWLINE + + " \"sinks\" : [ \"sink2\" ]," + NEWLINE + + " \"sinkKafkaTopics\" : [ \"sink2 topic\" ]," + NEWLINE + + " \"id\" : \"writeId\"," + NEWLINE + + " \"statusCount\" : {" + NEWLINE + + " \"RUNNING\" : 1," + NEWLINE + + " \"ERROR\" : 2" + NEWLINE + + " }," + NEWLINE + + " \"queryType\" : \"PERSISTENT\"," + NEWLINE + + " \"state\" : \"" + AGGREGATE_STATUS +"\"" + NEWLINE + + " } ]," + NEWLINE + + " \"fields\" : [ {" + NEWLINE + + " \"name\" : \"ROWKEY\"," + NEWLINE + + " \"schema\" : {" + NEWLINE + + " \"type\" : \"STRING\"," + NEWLINE + + " \"fields\" : null," + NEWLINE + + " \"memberSchema\" : null" + NEWLINE + + " }," + NEWLINE + + " \"type\" : \"KEY\"" + NEWLINE + + " }, {" + NEWLINE + + " \"name\" : \"f_0\"," + NEWLINE + + " \"schema\" : {" + NEWLINE + + " \"type\" : \"STRING\"," + NEWLINE + + " \"fields\" : null," + NEWLINE + + " \"memberSchema\" : null" + NEWLINE + + " }" + NEWLINE + + " } ]," + NEWLINE + + " \"type\" : \"TABLE\"," + NEWLINE + + " \"timestamp\" : \"2000-01-01\"," + NEWLINE + + " \"statistics\" : \"stats\"," + NEWLINE + + " \"errorStats\" : \"errors\"," + NEWLINE + + " \"extended\" : true," + NEWLINE + + " \"keyFormat\" : \"kafka\"," + NEWLINE + + " \"valueFormat\" : \"avro\"," + NEWLINE + + " \"topic\" : \"kadka-topic\"," + NEWLINE + + " \"partitions\" : 2," + NEWLINE + + " \"replication\" : 1," + NEWLINE + + " \"statement\" : \"sql statement text\"," + NEWLINE + + " \"consumerGroupOffsets\" : {" + NEWLINE + + " \"groupId\" : \"consumer1\"," + NEWLINE + + " \"kafkaTopic\" : \"kadka-topic\"," + NEWLINE + + " \"offsets\" : [ {" + NEWLINE + + " \"partition\" : 0," + NEWLINE + + " \"logStartOffset\" : 100," + NEWLINE + + " \"logEndOffset\" : 900," + NEWLINE + + " \"consumerOffset\" : 800" + NEWLINE + + " }, {" + NEWLINE + + " \"partition\" : 1," + NEWLINE + + " \"logStartOffset\" : 50," + NEWLINE + + " \"logEndOffset\" : 900," + NEWLINE + + " \"consumerOffset\" : 900" + NEWLINE + + " } ]" + NEWLINE + + " }" + NEWLINE + + " }," + NEWLINE + + " \"warnings\" : [ ]" + NEWLINE + + "} ]" + NEWLINE)); + } else { + assertThat(output, is("" + NEWLINE + + "Name : TestSource" + NEWLINE + + "Type : TABLE" + NEWLINE + + "Timestamp field : 2000-01-01" + NEWLINE + + "Key format : kafka" + NEWLINE + + "Value format : avro" + NEWLINE + + "Kafka topic : kadka-topic (partitions: 2, replication: 1)" + NEWLINE + + "Statement : sql statement text" + NEWLINE + + "" + NEWLINE + + " Field | Type " + NEWLINE + + "-----------------------------------------" + NEWLINE + + " ROWKEY | VARCHAR(STRING) (primary key) " + NEWLINE + + " f_0 | VARCHAR(STRING) " + NEWLINE + + "-----------------------------------------" + NEWLINE + + "" + NEWLINE + + "Queries that read from this TABLE" + NEWLINE + + "-----------------------------------" + NEWLINE + + "readId (" + AGGREGATE_STATUS +") : read query" + NEWLINE + + "\n" + + "For query topology and execution plan please run: EXPLAIN " + NEWLINE + + "" + NEWLINE + + "Queries that write from this TABLE" + NEWLINE + + "-----------------------------------" + NEWLINE + + "writeId (" + AGGREGATE_STATUS + ") : write query" + NEWLINE + + "\n" + + "For query topology and execution plan please run: EXPLAIN " + NEWLINE + + "" + NEWLINE + + "Local runtime statistics" + NEWLINE + + "------------------------" + NEWLINE + + "stats" + NEWLINE + + "errors" + NEWLINE + + "(Statistics of the local KSQL server interaction with the Kafka topic kadka-topic)" + NEWLINE + + NEWLINE + + "Consumer Group : consumer1" + NEWLINE + + "Kafka topic : kadka-topic" + NEWLINE + + NEWLINE + + " Partition | Start Offset | End Offset | Offset | Lag " + NEWLINE + + "------------------------------------------------------" + NEWLINE + + " 0 | 100 | 900 | 800 | 100 " + NEWLINE + + " 1 | 50 | 900 | 900 | 0 " + NEWLINE + + "------------------------------------------------------" + NEWLINE)); + } + } + @Test public void shouldPrintTopicDescribeExtended() { // Given: @@ -1216,7 +1388,8 @@ public void shouldPrintTopicDescribeExtended() { + " \"topic\" : \"kadka-topic\"," + NEWLINE + " \"partitions\" : 2," + NEWLINE + " \"replication\" : 1," + NEWLINE - + " \"statement\" : \"sql statement text\"" + NEWLINE + + " \"statement\" : \"sql statement text\"," + NEWLINE + + " \"consumerGroupOffsets\" : null" + NEWLINE + " }," + NEWLINE + " \"warnings\" : [ ]" + NEWLINE + "} ]" + NEWLINE)); diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/entity/SourceDescriptionFactory.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/entity/SourceDescriptionFactory.java index bbf2c5a4a279..c0d61c791246 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/entity/SourceDescriptionFactory.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/entity/SourceDescriptionFactory.java @@ -71,32 +71,32 @@ public static SourceDescription create( topicDescription.map(td -> td.partitions().get(0).replicas().size()).orElse(0), dataSource.getSqlExpression(), topicDescription.flatMap(td -> consumerGroupDescription.map(cg -> - new ConsumerGroupOffsets(cg.groupId(), td.name(), + new SourceConsumerOffsets(cg.groupId(), td.name(), consumerOffsets(td, topicAndStartOffsets, topicAndEndOffsets, topicAndConsumerOffsets))))); } - private static List consumerOffsets( + private static List consumerOffsets( final TopicDescription topicDescription, final Map topicAndStartOffsets, final Map topicAndEndOffsets, final Map topicAndConsumerOffsets ) { - List consumerOffsets = new ArrayList<>(); + List sourceConsumerOffsets = new ArrayList<>(); for (TopicPartitionInfo topicPartitionInfo : topicDescription.partitions()) { final TopicPartition tp = new TopicPartition(topicDescription.name(), topicPartitionInfo.partition()); ListOffsetsResultInfo startOffsetResultInfo = topicAndStartOffsets.get(tp); ListOffsetsResultInfo endOffsetResultInfo = topicAndEndOffsets.get(tp); OffsetAndMetadata offsetAndMetadata = topicAndConsumerOffsets.get(tp); - consumerOffsets.add( - new ConsumerOffset( + sourceConsumerOffsets.add( + new SourceConsumerOffset( topicPartitionInfo.partition(), startOffsetResultInfo != null ? startOffsetResultInfo.offset() : 0, endOffsetResultInfo != null ? endOffsetResultInfo.offset() : 0, offsetAndMetadata != null ? offsetAndMetadata.offset() : 0 )); } - return consumerOffsets; + return sourceConsumerOffsets; } } diff --git a/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/ConsumerOffset.java b/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/SourceConsumerOffset.java similarity index 92% rename from ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/ConsumerOffset.java rename to ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/SourceConsumerOffset.java index 650f301e4d88..810123713e84 100644 --- a/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/ConsumerOffset.java +++ b/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/SourceConsumerOffset.java @@ -6,7 +6,7 @@ import java.util.Objects; @JsonIgnoreProperties(ignoreUnknown = true) -public class ConsumerOffset { +public class SourceConsumerOffset { private final int partition; private final long logStartOffset; @@ -14,7 +14,7 @@ public class ConsumerOffset { private final long consumerOffset; @JsonCreator - public ConsumerOffset( + public SourceConsumerOffset( @JsonProperty("partition") int partition, @JsonProperty("logStartOffset") long logStartOffset, @JsonProperty("logEndOffset") long logEndOffset, @@ -50,7 +50,7 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) { return false; } - ConsumerOffset that = (ConsumerOffset) o; + SourceConsumerOffset that = (SourceConsumerOffset) o; return partition == that.partition && logStartOffset == that.logStartOffset && logEndOffset == that.logEndOffset && diff --git a/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/ConsumerGroupOffsets.java b/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/SourceConsumerOffsets.java similarity index 80% rename from ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/ConsumerGroupOffsets.java rename to ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/SourceConsumerOffsets.java index e421534ca353..3b4a9a26936f 100644 --- a/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/ConsumerGroupOffsets.java +++ b/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/SourceConsumerOffsets.java @@ -7,16 +7,16 @@ import java.util.Objects; @JsonIgnoreProperties(ignoreUnknown = true) -public class ConsumerGroupOffsets { +public class SourceConsumerOffsets { private final String groupId; private final String kafkaTopic; - private final List offsets; + private final List offsets; @JsonCreator - public ConsumerGroupOffsets( + public SourceConsumerOffsets( @JsonProperty("groupId") String groupId, @JsonProperty("kafkaTopic") String kafkaTopic, - @JsonProperty("offsets") List offsets + @JsonProperty("offsets") List offsets ) { this.groupId = groupId; this.kafkaTopic = kafkaTopic; @@ -31,7 +31,7 @@ public String getKafkaTopic() { return kafkaTopic; } - public List getOffsets() { + public List getOffsets() { return offsets; } @@ -43,7 +43,7 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) { return false; } - ConsumerGroupOffsets that = (ConsumerGroupOffsets) o; + SourceConsumerOffsets that = (SourceConsumerOffsets) o; return Objects.equals(groupId, that.groupId) && Objects.equals(kafkaTopic, that.kafkaTopic) && Objects.equals(offsets, that.offsets); diff --git a/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/SourceDescription.java b/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/SourceDescription.java index 2663e7e3ce5c..d0edd47eecbb 100644 --- a/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/SourceDescription.java +++ b/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/SourceDescription.java @@ -47,7 +47,7 @@ public class SourceDescription { private final int partitions; private final int replication; private final String statement; - private final Optional consumerGroupOffsets; + private final Optional consumerOffsets; // CHECKSTYLE_RULES.OFF: ParameterNumberCheck @JsonCreator @@ -68,7 +68,7 @@ public SourceDescription( @JsonProperty("partitions") final int partitions, @JsonProperty("replication") final int replication, @JsonProperty("statement") final String statement, - @JsonProperty("consumerGroupOffsets") final Optional consumerGroupOffsets) { + @JsonProperty("consumerOffsets") final Optional consumerOffsets) { // CHECKSTYLE_RULES.ON: ParameterNumberCheck this.name = Objects.requireNonNull(name, "name"); this.windowType = Objects.requireNonNull(windowType, "windowType"); @@ -89,7 +89,7 @@ public SourceDescription( this.partitions = partitions; this.replication = replication; this.statement = Objects.requireNonNull(statement, "statement"); - this.consumerGroupOffsets = Objects.requireNonNull(consumerGroupOffsets, "consumerGroupOffsets"); + this.consumerOffsets = Objects.requireNonNull(consumerOffsets, "consumerOffsets"); } public String getStatement() { @@ -156,8 +156,8 @@ public String getErrorStats() { return errorStats; } - public Optional getConsumerGroupOffsets() { - return consumerGroupOffsets; + public Optional getConsumerGroupOffsets() { + return consumerOffsets; } // CHECKSTYLE_RULES.OFF: CyclomaticComplexity @@ -187,7 +187,7 @@ public boolean equals(final Object o) { && Objects.equals(valueFormat, that.valueFormat) && Objects.equals(topic, that.topic) && Objects.equals(statement, that.statement) - && Objects.equals(consumerGroupOffsets, that.consumerGroupOffsets); + && Objects.equals(consumerOffsets, that.consumerOffsets); } @Override @@ -209,7 +209,7 @@ public int hashCode() { partitions, replication, statement, - consumerGroupOffsets + consumerOffsets ); } }