diff --git a/ksqldb-cli/src/main/java/io/confluent/ksql/cli/console/Console.java b/ksqldb-cli/src/main/java/io/confluent/ksql/cli/console/Console.java index b353ed090e7d..aea5faeddf44 100644 --- a/ksqldb-cli/src/main/java/io/confluent/ksql/cli/console/Console.java +++ b/ksqldb-cli/src/main/java/io/confluent/ksql/cli/console/Console.java @@ -50,6 +50,7 @@ import io.confluent.ksql.rest.entity.CommandStatusEntity; import io.confluent.ksql.rest.entity.ConnectorDescription; import io.confluent.ksql.rest.entity.ConnectorList; +import io.confluent.ksql.rest.entity.ConsumerGroupOffsets; import io.confluent.ksql.rest.entity.CreateConnectorEntity; import io.confluent.ksql.rest.entity.DropConnectorEntity; import io.confluent.ksql.rest.entity.ErrorEntity; @@ -618,6 +619,24 @@ private void printSourceDescription(final SourceDescription source) { "Statistics of the local KSQL server interaction with the Kafka topic " + source.getTopic() )); + writer().println(); + ConsumerGroupOffsets consumerGroupOffsets = source.getConsumerGroupOffsets(); + writer().println(String.format("%-20s : %s", "Consumer Group", consumerGroupOffsets.getGroupId())); + writer().println(String.format("%-20s : %s", "Kafka topic", consumerGroupOffsets.getKafkaTopic())); + writer().println(""); + final Table taskTable = new Table.Builder() + .withColumnHeaders(ImmutableList.of("Partition", "Start Offset", "End Offset", "Offset", "Lag")) + .withRows(consumerGroupOffsets.getOffsets() + .stream() + .map(offset -> ImmutableList.of( + String.valueOf(offset.getPartition()), + String.valueOf(offset.getLogStartOffset()), + String.valueOf(offset.getLogEndOffset()), + String.valueOf(offset.getConsumerOffset()), + String.valueOf(offset.getLogEndOffset() - offset.getConsumerOffset()) + ))) + .build(); + taskTable.print(this); } private void printSourceDescriptionList(final SourceDescriptionList sourceDescriptionList) { diff --git a/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.java b/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.java index e28947eb9a67..9f55194d21a4 100644 --- a/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.java +++ b/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.java @@ -67,7 +67,7 @@ import io.confluent.ksql.rest.entity.RunningQuery; import io.confluent.ksql.rest.entity.SchemaInfo; import io.confluent.ksql.rest.entity.SimpleConnectorInfo; -import io.confluent.ksql.rest.entity.SourceConsumerGroupOffsets; +import io.confluent.ksql.rest.entity.ConsumerGroupOffsets; import io.confluent.ksql.rest.entity.SourceDescription; import io.confluent.ksql.rest.entity.SourceDescriptionEntity; import io.confluent.ksql.rest.entity.SourceInfo; @@ -142,7 +142,7 @@ public class ConsoleTest { 2, 1, "statement", - new SourceConsumerGroupOffsets("", "kadka-topic", Collections.emptyList())); + new ConsumerGroupOffsets("", "kadka-topic", Collections.emptyList())); @Mock private QueryStatusCount queryStatusCount; @@ -501,7 +501,7 @@ public void testPrintSourceDescription() { 1, 1, "sql statement", - new SourceConsumerGroupOffsets("", "kadka-topic", Collections.emptyList())), + new ConsumerGroupOffsets("", "kadka-topic", Collections.emptyList())), Collections.emptyList() ) )); @@ -1150,7 +1150,7 @@ public void shouldPrintTopicDescribeExtended() { "kadka-topic", 2, 1, "sql statement text", - new SourceConsumerGroupOffsets("", "kadka-topic", Collections.emptyList())), + new ConsumerGroupOffsets("", "kadka-topic", Collections.emptyList())), Collections.emptyList() )) ); diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/entity/SourceDescriptionFactory.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/entity/SourceDescriptionFactory.java index 16ebf5d71526..ac5d7711ff3b 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/entity/SourceDescriptionFactory.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/entity/SourceDescriptionFactory.java @@ -30,6 +30,7 @@ import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicPartitionInfo; public final class SourceDescriptionFactory { @@ -43,6 +44,7 @@ public static SourceDescription create( final List writeQueries, final Optional topicDescription, final Optional consumerGroupDescription, + final Map topicAndStartOffsets, final Map topicAndEndOffsets, final Map topicAndConsumerOffsets ) { @@ -69,21 +71,38 @@ public static SourceDescription create( topicDescription.map(td -> td.partitions().size()).orElse(0), topicDescription.map(td -> td.partitions().get(0).replicas().size()).orElse(0), dataSource.getSqlExpression(), - consumerGroupDescription.map( - cg -> new SourceConsumerGroupOffsets(cg.groupId(), dataSource.getKafkaTopicName(), - consumerOffsets(topicAndEndOffsets, topicAndConsumerOffsets))) - .orElse(new SourceConsumerGroupOffsets("", dataSource.getKafkaTopicName(), Collections - .emptyList()))); + topicDescription.map(td -> + consumerGroupDescription.map( + cg -> new ConsumerGroupOffsets( + cg.groupId(), + dataSource.getKafkaTopicName(), + consumerOffsets(td, topicAndStartOffsets, topicAndEndOffsets, + topicAndConsumerOffsets))) + .orElse( + new ConsumerGroupOffsets("", "", Collections.emptyList()))) + .orElse(new ConsumerGroupOffsets("", "", Collections.emptyList()))); } - private static List consumerOffsets( + private static List consumerOffsets( + final TopicDescription topicDescription, + final Map topicAndStartOffsets, final Map topicAndEndOffsets, final Map topicAndConsumerOffsets ) { - List consumerOffsets = new ArrayList<>(); - for (Map.Entry entry : topicAndConsumerOffsets.entrySet()) { - consumerOffsets.add(new SourceConsumerOffset(entry.getKey().partition(), - topicAndEndOffsets.get(entry.getKey()).offset(), entry.getValue().offset())); + List consumerOffsets = new ArrayList<>(); + for (TopicPartitionInfo topicPartitionInfo : topicDescription.partitions()) { + final TopicPartition tp = new TopicPartition(topicDescription.name(), + topicPartitionInfo.partition()); + ListOffsetsResultInfo startOffsetResultInfo = topicAndStartOffsets.get(tp); + ListOffsetsResultInfo endOffsetResultInfo = topicAndEndOffsets.get(tp); + OffsetAndMetadata offsetAndMetadata = topicAndConsumerOffsets.get(tp); + consumerOffsets.add( + new ConsumerOffset( + topicPartitionInfo.partition(), + startOffsetResultInfo != null ? startOffsetResultInfo.offset() : 0, + endOffsetResultInfo != null ? endOffsetResultInfo.offset() : 0, + offsetAndMetadata != null ? offsetAndMetadata.offset() : 0 + )); } return consumerOffsets; } diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/DescribeConnectorExecutor.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/DescribeConnectorExecutor.java index 8d48336c8216..166df7224f4e 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/DescribeConnectorExecutor.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/DescribeConnectorExecutor.java @@ -138,6 +138,7 @@ public Optional execute( Optional.empty(), Optional.empty(), ImmutableMap.of(), + ImmutableMap.of(), ImmutableMap.of())) .collect(Collectors.toList()); } else { diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListSourceExecutor.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListSourceExecutor.java index e607ed2d9612..d9bfe5cd564e 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListSourceExecutor.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListSourceExecutor.java @@ -208,6 +208,7 @@ private static SourceDescriptionWithWarnings describeSource( Optional topicDescription = Optional.empty(); Optional consumerGroupDescription = Optional.empty(); Map topicAndConsumerOffsets = new LinkedHashMap<>(); + Map topicAndStartOffsets = new LinkedHashMap<>(); Map topicAndEndOffsets = new LinkedHashMap<>(); final List warnings = new LinkedList<>(); if (extended) { @@ -223,11 +224,14 @@ private static SourceDescriptionWithWarnings describeSource( serviceContext.getAdminClient().describeConsumerGroups(Collections.singletonList(consumerGroupId)).describedGroups().get(consumerGroupId).get() ); topicAndConsumerOffsets = serviceContext.getAdminClient().listConsumerGroupOffsets(consumerGroupId).partitionsToOffsetAndMetadata().get(); - Map request = new LinkedHashMap<>(); + Map startRequest = new LinkedHashMap<>(); + Map endRequest = new LinkedHashMap<>(); for (Map.Entry entry: topicAndConsumerOffsets.entrySet()) { - request.put(entry.getKey(), OffsetSpec.earliest()); + startRequest.put(entry.getKey(), OffsetSpec.earliest()); + endRequest.put(entry.getKey(), OffsetSpec.latest()); } - topicAndEndOffsets = serviceContext.getAdminClient().listOffsets(request).all().get(); + topicAndStartOffsets = serviceContext.getAdminClient().listOffsets(startRequest).all().get(); + topicAndEndOffsets = serviceContext.getAdminClient().listOffsets(endRequest).all().get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } @@ -245,6 +249,7 @@ private static SourceDescriptionWithWarnings describeSource( sinkQueries, topicDescription, consumerGroupDescription, + topicAndStartOffsets, topicAndEndOffsets, topicAndConsumerOffsets ) diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/entity/SourceDescriptionFactoryTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/entity/SourceDescriptionFactoryTest.java index f6431826613e..6816108e469a 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/entity/SourceDescriptionFactoryTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/entity/SourceDescriptionFactoryTest.java @@ -122,6 +122,7 @@ public void shouldReturnStatsBasedOnKafkaTopic() { Optional.empty(), Optional.empty(), Collections.emptyMap(), + Collections.emptyMap(), Collections.emptyMap()); // Then: @@ -148,6 +149,7 @@ public void shouldReturnEmptyTimestampColumn() { Optional.empty(), Optional.empty(), Collections.emptyMap(), + Collections.emptyMap(), Collections.emptyMap()); // Then: @@ -173,6 +175,7 @@ public void shouldReturnTimestampColumnIfPresent() { Optional.empty(), Optional.empty(), Collections.emptyMap(), + Collections.emptyMap(), Collections.emptyMap()); // Then: diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListSourceExecutorTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListSourceExecutorTest.java index 3a91267c5267..a3ab52e8cc34 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListSourceExecutorTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListSourceExecutorTest.java @@ -149,6 +149,7 @@ public void shouldShowStreamsExtended() { Optional.of(topicWith1PartitionAndRfOf1), Optional.empty(), ImmutableMap.of(), + ImmutableMap.of(), ImmutableMap.of()), SourceDescriptionFactory.create( stream2, @@ -158,6 +159,7 @@ public void shouldShowStreamsExtended() { Optional.of(topicWith1PartitionAndRfOf1), Optional.empty(), ImmutableMap.of(), + ImmutableMap.of(), ImmutableMap.of()) )); } @@ -222,6 +224,7 @@ public void shouldShowTablesExtended() { Optional.of(client.describeTopic(table1.getKafkaTopicName())), Optional.empty(), ImmutableMap.of(), + ImmutableMap.of(), ImmutableMap.of() ), SourceDescriptionFactory.create( @@ -232,6 +235,7 @@ public void shouldShowTablesExtended() { Optional.of(client.describeTopic(table1.getKafkaTopicName())), Optional.empty(), ImmutableMap.of(), + ImmutableMap.of(), ImmutableMap.of() ) )); @@ -282,6 +286,7 @@ public void shouldShowColumnsSource() { Optional.empty(), Optional.empty(), ImmutableMap.of(), + ImmutableMap.of(), ImmutableMap.of()))); } @@ -347,6 +352,7 @@ private static void assertSourceListWithWarning( Optional.empty(), Optional.empty(), ImmutableMap.of(), + ImmutableMap.of(), ImmutableMap.of() ) ) @@ -436,6 +442,7 @@ public void shouldAddWarningOnClientExceptionForDescription() { Optional.empty(), Optional.empty(), ImmutableMap.of(), + ImmutableMap.of(), ImmutableMap.of() ) ) diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java index baa2a86ee70e..e115d3884057 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java @@ -487,6 +487,7 @@ public void shouldShowStreamsExtended() { Optional.of(kafkaTopicClient.describeTopic("KAFKA_TOPIC_2")), Optional.empty(), ImmutableMap.of(), + ImmutableMap.of(), ImmutableMap.of()), SourceDescriptionFactory.create( ksqlEngine.getMetaStore().getSource(SourceName.of("new_stream")), @@ -494,6 +495,7 @@ public void shouldShowStreamsExtended() { Optional.of(kafkaTopicClient.describeTopic("new_topic")), Optional.empty(), ImmutableMap.of(), + ImmutableMap.of(), ImmutableMap.of())) ); } @@ -523,6 +525,7 @@ public void shouldShowTablesExtended() { Optional.of(kafkaTopicClient.describeTopic("KAFKA_TOPIC_1")), Optional.empty(), ImmutableMap.of(), + ImmutableMap.of(), ImmutableMap.of()), SourceDescriptionFactory.create( ksqlEngine.getMetaStore().getSource(SourceName.of("new_table")), @@ -530,6 +533,7 @@ public void shouldShowTablesExtended() { Optional.of(kafkaTopicClient.describeTopic("new_topic")), Optional.empty(), ImmutableMap.of(), + ImmutableMap.of(), ImmutableMap.of())) ); } @@ -577,6 +581,7 @@ public void shouldDescribeStatement() { Optional.empty(), Optional.empty(), ImmutableMap.of(), + ImmutableMap.of(), ImmutableMap.of() ); diff --git a/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/SourceConsumerGroupOffsets.java b/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/ConsumerGroupOffsets.java similarity index 79% rename from ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/SourceConsumerGroupOffsets.java rename to ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/ConsumerGroupOffsets.java index 576cadb038b8..e421534ca353 100644 --- a/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/SourceConsumerGroupOffsets.java +++ b/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/ConsumerGroupOffsets.java @@ -7,16 +7,16 @@ import java.util.Objects; @JsonIgnoreProperties(ignoreUnknown = true) -public class SourceConsumerGroupOffsets { +public class ConsumerGroupOffsets { private final String groupId; private final String kafkaTopic; - private final List offsets; + private final List offsets; @JsonCreator - public SourceConsumerGroupOffsets( + public ConsumerGroupOffsets( @JsonProperty("groupId") String groupId, @JsonProperty("kafkaTopic") String kafkaTopic, - @JsonProperty("offsets") List offsets + @JsonProperty("offsets") List offsets ) { this.groupId = groupId; this.kafkaTopic = kafkaTopic; @@ -31,7 +31,7 @@ public String getKafkaTopic() { return kafkaTopic; } - public List getOffsets() { + public List getOffsets() { return offsets; } @@ -43,7 +43,7 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) { return false; } - SourceConsumerGroupOffsets that = (SourceConsumerGroupOffsets) o; + ConsumerGroupOffsets that = (ConsumerGroupOffsets) o; return Objects.equals(groupId, that.groupId) && Objects.equals(kafkaTopic, that.kafkaTopic) && Objects.equals(offsets, that.offsets); diff --git a/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/SourceConsumerOffset.java b/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/ConsumerOffset.java similarity index 73% rename from ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/SourceConsumerOffset.java rename to ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/ConsumerOffset.java index 197380e38381..650f301e4d88 100644 --- a/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/SourceConsumerOffset.java +++ b/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/ConsumerOffset.java @@ -6,19 +6,22 @@ import java.util.Objects; @JsonIgnoreProperties(ignoreUnknown = true) -public class SourceConsumerOffset { +public class ConsumerOffset { private final int partition; + private final long logStartOffset; private final long logEndOffset; private final long consumerOffset; @JsonCreator - public SourceConsumerOffset( + public ConsumerOffset( @JsonProperty("partition") int partition, + @JsonProperty("logStartOffset") long logStartOffset, @JsonProperty("logEndOffset") long logEndOffset, @JsonProperty("consumerOffset") long consumerOffset ) { this.partition = partition; + this.logStartOffset = logStartOffset; this.logEndOffset = logEndOffset; this.consumerOffset = consumerOffset; } @@ -31,6 +34,10 @@ public long getConsumerOffset() { return consumerOffset; } + public long getLogStartOffset() { + return logStartOffset; + } + public long getLogEndOffset() { return logEndOffset; } @@ -43,14 +50,15 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) { return false; } - SourceConsumerOffset that = (SourceConsumerOffset) o; + ConsumerOffset that = (ConsumerOffset) o; return partition == that.partition && + logStartOffset == that.logStartOffset && logEndOffset == that.logEndOffset && consumerOffset == that.consumerOffset; } @Override public int hashCode() { - return Objects.hash(partition, logEndOffset, consumerOffset); + return Objects.hash(partition, logStartOffset, logEndOffset, consumerOffset); } } diff --git a/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/SourceDescription.java b/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/SourceDescription.java index b194ad8223e6..70e265686378 100644 --- a/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/SourceDescription.java +++ b/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/SourceDescription.java @@ -47,7 +47,7 @@ public class SourceDescription { private final int partitions; private final int replication; private final String statement; - private final SourceConsumerGroupOffsets sourceConsumerGroupOffsets; + private final ConsumerGroupOffsets consumerGroupOffsets; // CHECKSTYLE_RULES.OFF: ParameterNumberCheck @JsonCreator @@ -68,7 +68,7 @@ public SourceDescription( @JsonProperty("partitions") final int partitions, @JsonProperty("replication") final int replication, @JsonProperty("statement") final String statement, - @JsonProperty("consumerGroupOffsets") SourceConsumerGroupOffsets sourceConsumerGroupOffsets) { + @JsonProperty("consumerGroupOffsets") ConsumerGroupOffsets consumerGroupOffsets) { // CHECKSTYLE_RULES.ON: ParameterNumberCheck this.name = Objects.requireNonNull(name, "name"); this.windowType = Objects.requireNonNull(windowType, "windowType"); @@ -89,7 +89,7 @@ public SourceDescription( this.partitions = partitions; this.replication = replication; this.statement = Objects.requireNonNull(statement, "statement"); - this.sourceConsumerGroupOffsets = sourceConsumerGroupOffsets; + this.consumerGroupOffsets = Objects.requireNonNull(consumerGroupOffsets, "consumerGroupOffsets"); } public String getStatement() { @@ -156,8 +156,8 @@ public String getErrorStats() { return errorStats; } - public SourceConsumerGroupOffsets getSourceConsumerGroupOffsets() { - return sourceConsumerGroupOffsets; + public ConsumerGroupOffsets getConsumerGroupOffsets() { + return consumerGroupOffsets; } // CHECKSTYLE_RULES.OFF: CyclomaticComplexity @@ -187,7 +187,7 @@ public boolean equals(final Object o) { && Objects.equals(valueFormat, that.valueFormat) && Objects.equals(topic, that.topic) && Objects.equals(statement, that.statement) - && Objects.equals(sourceConsumerGroupOffsets, that.sourceConsumerGroupOffsets); + && Objects.equals(consumerGroupOffsets, that.consumerGroupOffsets); } @Override @@ -209,7 +209,7 @@ public int hashCode() { partitions, replication, statement, - sourceConsumerGroupOffsets + consumerGroupOffsets ); } } diff --git a/ksqldb-rest-model/src/test/java/io/confluent/ksql/rest/entity/SourceDescriptionTest.java b/ksqldb-rest-model/src/test/java/io/confluent/ksql/rest/entity/SourceDescriptionTest.java index d44abbc60ad8..956c113cd4d8 100644 --- a/ksqldb-rest-model/src/test/java/io/confluent/ksql/rest/entity/SourceDescriptionTest.java +++ b/ksqldb-rest-model/src/test/java/io/confluent/ksql/rest/entity/SourceDescriptionTest.java @@ -33,8 +33,8 @@ public class SourceDescriptionTest { private static final String SOME_STRING = "some string"; private static final int SOME_INT = 3; private static final boolean SOME_BOOL = true; - private static final SourceConsumerGroupOffsets SOME_CG_OFFSETS = - new SourceConsumerGroupOffsets("group", "topic", new ArrayList<>()); + private static final ConsumerGroupOffsets SOME_CG_OFFSETS = + new ConsumerGroupOffsets("group", "topic", new ArrayList<>()); @Mock private RunningQuery query1;