From dd3eb5ffdbbb4d285b2558e7f8285abcc28a9e70 Mon Sep 17 00:00:00 2001 From: cpettitt-confluent <53191309+cpettitt-confluent@users.noreply.github.com> Date: Wed, 7 Aug 2019 15:27:55 -0600 Subject: [PATCH] feat: Add SHOW TOPICS EXTENDED (#3183) Fixes #1268 BREAKING CHANGE: "SHOW TOPICS" no longer includes the "Consumers" and "ConsumerGroups" columns. You can use "SHOW TOPICS EXTENDED" to get the output previous emitted from "SHOW TOPICS". See below for examples. This change splits "SHOW TOPICS" into two commands: 1. "SHOW TOPICS EXTENDED", which shows what was previously shown by "SHOW TOPICS". Sample output: ``` ksql> show topics extended; Kafka Topic | Partitions | Partition Replicas | Consumers | ConsumerGroups -------------------------------------------------------------------------------------------------------------------------------------------------------------- _confluent-command | 1 | 1 | 1 | 1 _confluent-controlcenter-5-3-0-1-actual-group-consumption-rekey | 1 | 1 | 1 | 1 ``` 2. "SHOW TOPICS", which now no longer queries consumer groups and their active consumers. Sample output: ``` ksql> show topics; Kafka Topic | Partitions | Partition Replicas --------------------------------------------------------------------------------------------------------------------------------- _confluent-command | 1 | 1 _confluent-controlcenter-5-3-0-1-actual-group-consumption-rekey | 1 | 1 ``` --- .../query-with-structured-data.rst | 12 +- docs/developer-guide/syntax-reference.rst | 7 +- docs/includes/ksql-includes.rst | 14 +- docs/tutorials/basics-control-center.rst | 4 +- .../confluent/ksql/cli/console/Console.java | 7 +- .../builder/KafkaTopicsListTableBuilder.java | 64 +++++--- .../java/io/confluent/ksql/cli/CliTest.java | 10 +- .../io/confluent/ksql/parser/SqlBase.g4 | 2 +- .../io/confluent/ksql/parser/AstBuilder.java | 2 +- .../ksql/parser/tree/ListTopics.java | 25 +++- .../confluent/ksql/parser/KsqlParserTest.java | 25 +++- .../ksql/parser/tree/ListTopicsTest.java | 10 +- .../ksql/rest/entity/KafkaTopicInfo.java | 16 +- .../rest/entity/KafkaTopicInfoExtended.java | 79 ++++++++++ .../ksql/rest/entity/KafkaTopicsList.java | 98 ------------ .../rest/entity/KafkaTopicsListExtended.java | 62 ++++++++ .../ksql/rest/entity/KsqlEntity.java | 1 + .../server/execution/ListTopicsExecutor.java | 141 ++++++++++++++++-- .../entity/KafkaTopicsListExtendedTest.java | 54 +++++++ .../ksql/rest/entity/KafkaTopicsListTest.java | 61 +------- .../execution/ListTopicsExecutorTest.java | 43 +++++- 21 files changed, 501 insertions(+), 236 deletions(-) create mode 100644 ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/KafkaTopicInfoExtended.java create mode 100644 ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/KafkaTopicsListExtended.java create mode 100644 ksql-rest-app/src/test/java/io/confluent/ksql/rest/entity/KafkaTopicsListExtendedTest.java diff --git a/docs/developer-guide/query-with-structured-data.rst b/docs/developer-guide/query-with-structured-data.rst index 84cde91c9f24..33d7cb597a8a 100644 --- a/docs/developer-guide/query-with-structured-data.rst +++ b/docs/developer-guide/query-with-structured-data.rst @@ -121,12 +121,12 @@ Your output should resemble: :: - Kafka Topic | Partitions | Partition Replicas | Consumers | ConsumerGroups - ---------------------------------------------------------------------------------- - _confluent-metrics | 12 | 1 | 0 | 0 - _schemas | 1 | 1 | 0 | 0 - raw-topic | 1 | 1 | 0 | 0 - ---------------------------------------------------------------------------------- + Kafka Topic | Partitions | Partition Replicas + ------------------------------------------------------ + _confluent-metrics | 12 | 1 + _schemas | 1 | 1 + raw-topic | 1 | 1 + ------------------------------------------------------ Inspect ``raw-topic`` to ensure that |kcat| populated it: diff --git a/docs/developer-guide/syntax-reference.rst b/docs/developer-guide/syntax-reference.rst index 75172e20936a..4b8ff472b383 100644 --- a/docs/developer-guide/syntax-reference.rst +++ b/docs/developer-guide/syntax-reference.rst @@ -1338,13 +1338,14 @@ SHOW TOPICS .. code:: sql - SHOW | LIST TOPICS; + SHOW | LIST TOPICS [EXTENDED]; **Description** -List the available topics in the Kafka cluster that KSQL is configured +SHOW TOPICS lists the available topics in the Kafka cluster that KSQL is configured to connect to (default setting for ``bootstrap.servers``: -``localhost:9092``). +``localhost:9092``). SHOW TOPICS EXTENDED also displays consumer groups and their active consumer +counts. .. _show-streams: diff --git a/docs/includes/ksql-includes.rst b/docs/includes/ksql-includes.rst index 2ead7ac65db6..5375d446fba2 100644 --- a/docs/includes/ksql-includes.rst +++ b/docs/includes/ksql-includes.rst @@ -126,13 +126,13 @@ Your output should resemble: :: - Kafka Topic | Partitions | Partition Replicas | Consumers | ConsumerGroups - ----------------------------------------------------------------------------------- - _confluent-metrics | 12 | 1 | 0 | 0 - _schemas | 1 | 1 | 0 | 0 - pageviews | 1 | 1 | 0 | 0 - users | 1 | 1 | 0 | 0 - ----------------------------------------------------------------------------------- + Kafka Topic | Partitions | Partition Replicas + ------------------------------------------------------ + _confluent-metrics | 12 | 1 + _schemas | 1 | 1 + pageviews | 1 | 1 + users | 1 | 1 + ------------------------------------------------------ Inspect the ``users`` topic by using the PRINT statement: diff --git a/docs/tutorials/basics-control-center.rst b/docs/tutorials/basics-control-center.rst index 41e643d254a3..457a6d1020aa 100644 --- a/docs/tutorials/basics-control-center.rst +++ b/docs/tutorials/basics-control-center.rst @@ -154,9 +154,7 @@ statements in KSQL Editor, just like you use them in the KSQL CLI. "registered": true, "replicaInfo": [ 1 - ], - "consumerCount": 0, - "consumerGroupCount": 0 + ] }, The ``"registered": true`` indicator means that you have registered the topic diff --git a/ksql-cli/src/main/java/io/confluent/ksql/cli/console/Console.java b/ksql-cli/src/main/java/io/confluent/ksql/cli/console/Console.java index f86ea7cf2771..3426ad47b3b8 100644 --- a/ksql-cli/src/main/java/io/confluent/ksql/cli/console/Console.java +++ b/ksql-cli/src/main/java/io/confluent/ksql/cli/console/Console.java @@ -49,6 +49,7 @@ import io.confluent.ksql.rest.entity.FunctionInfo; import io.confluent.ksql.rest.entity.FunctionNameList; import io.confluent.ksql.rest.entity.KafkaTopicsList; +import io.confluent.ksql.rest.entity.KafkaTopicsListExtended; import io.confluent.ksql.rest.entity.KsqlEntity; import io.confluent.ksql.rest.entity.KsqlEntityList; import io.confluent.ksql.rest.entity.KsqlErrorMessage; @@ -130,7 +131,11 @@ public class Console implements Closeable { .put(TablesList.class, tablePrinter(TablesList.class, TablesListTableBuilder::new)) .put(KafkaTopicsList.class, - tablePrinter(KafkaTopicsList.class, KafkaTopicsListTableBuilder::new)) + tablePrinter(KafkaTopicsList.class, KafkaTopicsListTableBuilder.SimpleBuilder::new)) + .put(KafkaTopicsListExtended.class, + tablePrinter( + KafkaTopicsListExtended.class, + KafkaTopicsListTableBuilder.ExtendedBuilder::new)) .put(ExecutionPlan.class, tablePrinter(ExecutionPlan.class, ExecutionPlanTableBuilder::new)) .put(FunctionNameList.class, diff --git a/ksql-cli/src/main/java/io/confluent/ksql/cli/console/table/builder/KafkaTopicsListTableBuilder.java b/ksql-cli/src/main/java/io/confluent/ksql/cli/console/table/builder/KafkaTopicsListTableBuilder.java index 0da2b82fee0f..e2ed1f9e912a 100644 --- a/ksql-cli/src/main/java/io/confluent/ksql/cli/console/table/builder/KafkaTopicsListTableBuilder.java +++ b/ksql-cli/src/main/java/io/confluent/ksql/cli/console/table/builder/KafkaTopicsListTableBuilder.java @@ -19,33 +19,57 @@ import io.confluent.ksql.cli.console.table.Table; import io.confluent.ksql.cli.console.table.Table.Builder; import io.confluent.ksql.rest.entity.KafkaTopicsList; +import io.confluent.ksql.rest.entity.KafkaTopicsListExtended; import io.confluent.ksql.util.StringUtil; import java.util.List; import java.util.stream.Stream; -public class KafkaTopicsListTableBuilder implements TableBuilder { +public class KafkaTopicsListTableBuilder { - private static final List HEADERS = ImmutableList.of( - "Kafka Topic", - "Partitions", - "Partition Replicas", - "Consumers", - "ConsumerGroups"); + public static class SimpleBuilder implements TableBuilder { + private static final List HEADERS = ImmutableList.of( + "Kafka Topic", + "Partitions", + "Partition Replicas"); - @Override - public Table buildTable(final KafkaTopicsList entity) { - final Stream> rows = entity.getTopics().stream() - .map(t -> ImmutableList.of( - t.getName(), - Integer.toString(t.getReplicaInfo().size()), - getTopicReplicaInfo(t.getReplicaInfo()), - Integer.toString(t.getConsumerCount()), - Integer.toString(t.getConsumerGroupCount()))); + @Override + public Table buildTable(final KafkaTopicsList entity) { + final Stream> rows = entity.getTopics().stream() + .map(t -> ImmutableList.of( + t.getName(), + Integer.toString(t.getReplicaInfo().size()), + getTopicReplicaInfo(t.getReplicaInfo()))); - return new Builder() - .withColumnHeaders(HEADERS) - .withRows(rows) - .build(); + return new Builder() + .withColumnHeaders(HEADERS) + .withRows(rows) + .build(); + } + } + + public static class ExtendedBuilder implements TableBuilder { + private static final List HEADERS = ImmutableList.of( + "Kafka Topic", + "Partitions", + "Partition Replicas", + "Consumers", + "ConsumerGroups"); + + @Override + public Table buildTable(final KafkaTopicsListExtended entity) { + final Stream> rows = entity.getTopics().stream() + .map(t -> ImmutableList.of( + t.getName(), + Integer.toString(t.getReplicaInfo().size()), + getTopicReplicaInfo(t.getReplicaInfo()), + Integer.toString(t.getConsumerCount()), + Integer.toString(t.getConsumerGroupCount()))); + + return new Builder() + .withColumnHeaders(HEADERS) + .withRows(rows) + .build(); + } } /** diff --git a/ksql-cli/src/test/java/io/confluent/ksql/cli/CliTest.java b/ksql-cli/src/test/java/io/confluent/ksql/cli/CliTest.java index b159e784a2de..3e746e74ee52 100644 --- a/ksql-cli/src/test/java/io/confluent/ksql/cli/CliTest.java +++ b/ksql-cli/src/test/java/io/confluent/ksql/cli/CliTest.java @@ -355,9 +355,17 @@ private void selectWithLimit( @Test public void shouldPrintResultsForListOrShowCommands() { - assertRunListCommand( "topics", + hasRow( + equalTo(orderDataProvider.topicName()), + equalTo("1"), + equalTo("1") + ) + ); + + assertRunListCommand( + "topics extended", hasRow( equalTo(orderDataProvider.topicName()), equalTo("1"), diff --git a/ksql-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 b/ksql-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 index 99df40dd02c9..a6a1786ed0c2 100644 --- a/ksql-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 +++ b/ksql-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 @@ -34,7 +34,7 @@ singleExpression statement : query #queryStatement | (LIST | SHOW) PROPERTIES #listProperties - | (LIST | SHOW) TOPICS #listTopics + | (LIST | SHOW) TOPICS EXTENDED? #listTopics | (LIST | SHOW) STREAMS EXTENDED? #listStreams | (LIST | SHOW) TABLES EXTENDED? #listTables | (LIST | SHOW) FUNCTIONS #listFunctions diff --git a/ksql-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java b/ksql-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java index f4382a8f7e4c..43929e1dfc82 100644 --- a/ksql-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java +++ b/ksql-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java @@ -561,7 +561,7 @@ public Node visitRunScript(final SqlBaseParser.RunScriptContext context) { @Override public Node visitListTopics(final SqlBaseParser.ListTopicsContext context) { - return new ListTopics(getLocation(context)); + return new ListTopics(getLocation(context), context.EXTENDED() != null); } @Override diff --git a/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/ListTopics.java b/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/ListTopics.java index 69afeed151fd..ea3304078156 100644 --- a/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/ListTopics.java +++ b/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/ListTopics.java @@ -22,27 +22,38 @@ public class ListTopics extends Statement { - public ListTopics(final Optional location) { + private final boolean showExtended; + + public ListTopics(final Optional location, final boolean showExtended) { super(location); + this.showExtended = showExtended; } - @Override - public int hashCode() { - return Objects.hash(getClass()); + public boolean getShowExtended() { + return showExtended; } @Override - public boolean equals(final Object obj) { - if (this == obj) { + public boolean equals(final Object o) { + if (this == o) { return true; } + if (o == null || getClass() != o.getClass()) { + return false; + } + final ListTopics that = (ListTopics) o; + return showExtended == that.showExtended; + } - return obj != null && obj.getClass().equals(getClass()); + @Override + public int hashCode() { + return Objects.hash(showExtended); } @Override public String toString() { return toStringHelper(this) + .add("showExtended", showExtended) .toString(); } } diff --git a/ksql-parser/src/test/java/io/confluent/ksql/parser/KsqlParserTest.java b/ksql-parser/src/test/java/io/confluent/ksql/parser/KsqlParserTest.java index de62cfdcfc4d..40527a29e5e0 100644 --- a/ksql-parser/src/test/java/io/confluent/ksql/parser/KsqlParserTest.java +++ b/ksql-parser/src/test/java/io/confluent/ksql/parser/KsqlParserTest.java @@ -619,11 +619,17 @@ public void testSelectSessionWindow() { @Test public void testShowTopics() { + // Given: final String simpleQuery = "SHOW TOPICS;"; + + // When: final Statement statement = KsqlParserTestUtil.buildSingleAst(simpleQuery, metaStore).getStatement(); - Assert.assertTrue(statement instanceof ListTopics); final ListTopics listTopics = (ListTopics) statement; - Assert.assertTrue(listTopics.toString().equalsIgnoreCase("ListTopics{}")); + + // Then: + Assert.assertTrue(statement instanceof ListTopics); + Assert.assertThat(listTopics.toString(), is("ListTopics{showExtended=false}")); + Assert.assertThat(listTopics.getShowExtended(), is(false)); } @Test @@ -761,6 +767,21 @@ public void shouldSetShowDescriptionsForShowStreamsDescriptions() { Assert.assertThat(listStreams.getShowExtended(), is(true)); } + @Test + public void shouldSetShowDescriptionsForShowTopicsDescriptions() { + // Given: + final String statementString = "SHOW TOPICS EXTENDED;"; + + // When: + final Statement statement = KsqlParserTestUtil.buildSingleAst(statementString, metaStore) + .getStatement(); + + // Then: + Assert.assertThat(statement, instanceOf(ListTopics.class)); + final ListTopics listTopics = (ListTopics)statement; + Assert.assertThat(listTopics.getShowExtended(), is(true)); + } + @Test public void shouldSetShowDescriptionsForShowTablesDescriptions() { final String statementString = "SHOW TABLES EXTENDED;"; diff --git a/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/ListTopicsTest.java b/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/ListTopicsTest.java index 1dab38a441ad..ef34e4d3bfaf 100644 --- a/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/ListTopicsTest.java +++ b/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/ListTopicsTest.java @@ -26,11 +26,15 @@ public class ListTopicsTest { @Test public void shouldImplementHashCodeAndEqualsProperty() { + // Note: At the moment location does not take part in equality testing new EqualsTester() .addEqualityGroup( - // Note: At the moment location does not take part in equality testing - new ListTopics(Optional.of(SOME_LOCATION)), - new ListTopics(Optional.of(OTHER_LOCATION)) + new ListTopics(Optional.of(SOME_LOCATION), true), + new ListTopics(Optional.of(OTHER_LOCATION), true) + ) + .addEqualityGroup( + new ListTopics(Optional.of(SOME_LOCATION), false), + new ListTopics(Optional.of(OTHER_LOCATION), false) ) .testEquals(); } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/KafkaTopicInfo.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/KafkaTopicInfo.java index cd246441598e..6e6c9858f910 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/KafkaTopicInfo.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/KafkaTopicInfo.java @@ -28,20 +28,14 @@ public class KafkaTopicInfo { private final String name; private final List replicaInfo; - private final int consumerGroupCount; - private final int consumerCount; @JsonCreator public KafkaTopicInfo( @JsonProperty("name") final String name, - @JsonProperty("replicaInfo") final List replicaInfo, - @JsonProperty("consumerCount") final int consumerCount, - @JsonProperty("consumerGroupCount") final int consumerGroupCount + @JsonProperty("replicaInfo") final List replicaInfo ) { this.name = name; this.replicaInfo = replicaInfo; - this.consumerGroupCount = consumerGroupCount; - this.consumerCount = consumerCount; } public String getName() { @@ -52,14 +46,6 @@ public List getReplicaInfo() { return replicaInfo; } - public int getConsumerCount() { - return consumerCount; - } - - public int getConsumerGroupCount() { - return consumerGroupCount; - } - @Override public boolean equals(final Object o) { if (this == o) { diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/KafkaTopicInfoExtended.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/KafkaTopicInfoExtended.java new file mode 100644 index 000000000000..ac39085b09a0 --- /dev/null +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/KafkaTopicInfoExtended.java @@ -0,0 +1,79 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.entity; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import java.util.List; +import java.util.Objects; + +@JsonIgnoreProperties(ignoreUnknown = true) +@JsonSubTypes({}) +public class KafkaTopicInfoExtended { + + private final String name; + private final List replicaInfo; + private final int consumerGroupCount; + private final int consumerCount; + + @JsonCreator + public KafkaTopicInfoExtended( + @JsonProperty("name") final String name, + @JsonProperty("replicaInfo") final List replicaInfo, + @JsonProperty("consumerCount") final int consumerCount, + @JsonProperty("consumerGroupCount") final int consumerGroupCount + ) { + this.name = name; + this.replicaInfo = replicaInfo; + this.consumerGroupCount = consumerGroupCount; + this.consumerCount = consumerCount; + } + + public String getName() { + return name; + } + + public List getReplicaInfo() { + return replicaInfo; + } + + public int getConsumerCount() { + return consumerCount; + } + + public int getConsumerGroupCount() { + return consumerGroupCount; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final KafkaTopicInfoExtended that = (KafkaTopicInfoExtended) o; + return Objects.equals(name, that.name); + } + + @Override + public int hashCode() { + return Objects.hash(name); + } +} diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/KafkaTopicsList.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/KafkaTopicsList.java index 19a34e4dddc8..904833125044 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/KafkaTopicsList.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/KafkaTopicsList.java @@ -19,24 +19,10 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; -import io.confluent.ksql.util.KafkaConsumerGroupClient; -import io.confluent.ksql.util.KafkaConsumerGroupClientImpl; -import io.confluent.ksql.util.KsqlConfig; -import io.confluent.ksql.util.KsqlConstants; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Objects; -import java.util.Set; -import java.util.TreeMap; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; -import org.apache.kafka.clients.admin.TopicDescription; -import org.apache.kafka.common.TopicPartition; @JsonIgnoreProperties(ignoreUnknown = true) public class KafkaTopicsList extends KsqlEntity { @@ -73,88 +59,4 @@ public boolean equals(final Object o) { public int hashCode() { return Objects.hash(getTopics()); } - - public static KafkaTopicsList build( - final String statementText, - final Map kafkaTopicDescriptions, - final KsqlConfig ksqlConfig, - final KafkaConsumerGroupClient consumerGroupClient - ) { - final List kafkaTopicInfoList = new ArrayList<>(); - final Map filteredDescriptions = new TreeMap<>( - filterKsqlInternalTopics(kafkaTopicDescriptions, ksqlConfig)); - - final Map> topicConsumersAndGroupCount = getTopicConsumerAndGroupCounts( - consumerGroupClient); - - for (final TopicDescription desp : filteredDescriptions.values()) { - kafkaTopicInfoList.add(new KafkaTopicInfo( - desp.name(), - desp.partitions() - .stream().map(partition -> partition.replicas().size()).collect(Collectors.toList()), - topicConsumersAndGroupCount.getOrDefault(desp.name(), Arrays.asList(0, 0)).get(0), - topicConsumersAndGroupCount.getOrDefault(desp.name(), Arrays.asList(0, 0)).get(1) - )); - } - return new KafkaTopicsList(statementText, kafkaTopicInfoList); - } - - /** - * @return all topics with their associated consumerCount and consumerGroupCount - */ - private static Map> getTopicConsumerAndGroupCounts( - final KafkaConsumerGroupClient consumerGroupClient - ) { - - final List consumerGroups = consumerGroupClient.listGroups(); - - final Map topicConsumerCount = new HashMap<>(); - final Map> topicConsumerGroupCount = new HashMap<>(); - - for (final String group : consumerGroups) { - final Collection consumerSummaryList = - consumerGroupClient.describeConsumerGroup(group).consumers(); - - for (final KafkaConsumerGroupClientImpl.ConsumerSummary summary : consumerSummaryList) { - - for (final TopicPartition topicPartition : summary.partitions()) { - topicConsumerCount - .computeIfAbsent(topicPartition.topic(), k -> new AtomicInteger()) - .incrementAndGet(); - topicConsumerGroupCount - .computeIfAbsent(topicPartition.topic(), k -> new HashSet<>()).add(group); - } - } - } - final HashMap> results = new HashMap<>(); - topicConsumerCount.forEach( - (k, v) -> { - results.computeIfAbsent(k, v1 -> new ArrayList<>()).add(v.intValue()); - results.get(k).add(topicConsumerGroupCount.get(k).size()); - } - ); - - return results; - } - - private static Map filterKsqlInternalTopics( - final Map kafkaTopicDescriptions, final KsqlConfig ksqlConfig - ) { - final Map filteredKafkaTopics = new HashMap<>(); - final String serviceId = KsqlConstants.KSQL_INTERNAL_TOPIC_PREFIX - + ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG); - final String persistentQueryPrefix = ksqlConfig.getString( - KsqlConfig.KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG); - final String transientQueryPrefix = ksqlConfig.getString( - KsqlConfig.KSQL_TRANSIENT_QUERY_NAME_PREFIX_CONFIG); - - for (final Map.Entry entry : kafkaTopicDescriptions.entrySet()) { - if (!entry.getKey().startsWith(serviceId + persistentQueryPrefix) - && !entry.getKey().startsWith(serviceId + transientQueryPrefix)) { - filteredKafkaTopics.put(entry.getKey().toLowerCase(), entry.getValue()); - } - } - return filteredKafkaTopics; - } - } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/KafkaTopicsListExtended.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/KafkaTopicsListExtended.java new file mode 100644 index 000000000000..1c7fbcbce31a --- /dev/null +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/KafkaTopicsListExtended.java @@ -0,0 +1,62 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.entity; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Objects; + +@JsonIgnoreProperties(ignoreUnknown = true) +public class KafkaTopicsListExtended extends KsqlEntity { + + private final Collection topics; + + @JsonCreator + public KafkaTopicsListExtended( + @JsonProperty("statementText") final String statementText, + @JsonProperty("topics") final Collection topics + ) { + super(statementText); + Preconditions.checkNotNull(topics, "topics field must not be null"); + this.topics = topics; + } + + public List getTopics() { + return new ArrayList<>(topics); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (!(o instanceof KafkaTopicsListExtended)) { + return false; + } + final KafkaTopicsListExtended that = (KafkaTopicsListExtended) o; + return Objects.equals(getTopics(), that.getTopics()); + } + + @Override + public int hashCode() { + return Objects.hash(getTopics()); + } +} diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/KsqlEntity.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/KsqlEntity.java index c97abead169b..a20e2a11229f 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/KsqlEntity.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/KsqlEntity.java @@ -37,6 +37,7 @@ @JsonSubTypes.Type(value = StreamsList.class, name = "streams"), @JsonSubTypes.Type(value = TablesList.class, name = "tables"), @JsonSubTypes.Type(value = KafkaTopicsList.class, name = "kafka_topics"), + @JsonSubTypes.Type(value = KafkaTopicsListExtended.class, name = "kafka_topics_extended"), @JsonSubTypes.Type(value = ExecutionPlan.class, name = "executionPlan"), @JsonSubTypes.Type(value = SourceDescriptionList.class, name = "source_descriptions"), @JsonSubTypes.Type(value = QueryDescriptionList.class, name = "query_descriptions"), diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListTopicsExecutor.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListTopicsExecutor.java index a78e06ec5d34..c6e52f4fb48e 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListTopicsExecutor.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListTopicsExecutor.java @@ -17,18 +17,39 @@ import io.confluent.ksql.KsqlExecutionContext; import io.confluent.ksql.parser.tree.ListTopics; +import io.confluent.ksql.rest.entity.KafkaTopicInfo; +import io.confluent.ksql.rest.entity.KafkaTopicInfoExtended; import io.confluent.ksql.rest.entity.KafkaTopicsList; +import io.confluent.ksql.rest.entity.KafkaTopicsListExtended; import io.confluent.ksql.rest.entity.KsqlEntity; import io.confluent.ksql.services.KafkaTopicClient; import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.statement.ConfiguredStatement; import io.confluent.ksql.util.KafkaConsumerGroupClient; +import io.confluent.ksql.util.KafkaConsumerGroupClient.ConsumerSummary; import io.confluent.ksql.util.KafkaConsumerGroupClientImpl; +import io.confluent.ksql.util.KsqlConfig; +import io.confluent.ksql.util.KsqlConstants; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.common.TopicPartition; public final class ListTopicsExecutor { - private ListTopicsExecutor() { } + private ListTopicsExecutor() { + + } public static Optional execute( final ConfiguredStatement statement, @@ -36,14 +57,114 @@ public static Optional execute( final ServiceContext serviceContext ) { final KafkaTopicClient client = serviceContext.getTopicClient(); - final KafkaConsumerGroupClient kafkaConsumerGroupClient - = new KafkaConsumerGroupClientImpl(serviceContext.getAdminClient()); - - return Optional.of(KafkaTopicsList.build( - statement.getStatementText(), - client.describeTopics(client.listNonInternalTopicNames()), - statement.getConfig(), - kafkaConsumerGroupClient - )); + + final Map kafkaTopicDescriptions + = client.describeTopics(client.listNonInternalTopicNames()); + + final Map filteredDescriptions = new TreeMap<>( + filterKsqlInternalTopics(kafkaTopicDescriptions, statement.getConfig())); + + if (statement.getStatement().getShowExtended()) { + final KafkaConsumerGroupClient consumerGroupClient + = new KafkaConsumerGroupClientImpl(serviceContext.getAdminClient()); + final Map> topicConsumersAndGroupCount + = getTopicConsumerAndGroupCounts(consumerGroupClient); + + final List topicInfoExtendedList = filteredDescriptions.values() + .stream().map(desc -> + topicDescriptionToTopicInfoExtended(desc, topicConsumersAndGroupCount)) + .collect(Collectors.toList()); + + return Optional.of( + new KafkaTopicsListExtended(statement.getStatementText(), topicInfoExtendedList)); + } else { + final List topicInfoList = filteredDescriptions.values() + .stream().map(desc -> topicDescriptionToTopicInfo(desc)) + .collect(Collectors.toList()); + + return Optional.of(new KafkaTopicsList(statement.getStatementText(), topicInfoList)); + } + } + + private static KafkaTopicInfo topicDescriptionToTopicInfo(final TopicDescription description) { + return new KafkaTopicInfo( + description.name(), + description.partitions() + .stream().map(partition -> partition.replicas().size()).collect(Collectors.toList())); + } + + private static KafkaTopicInfoExtended topicDescriptionToTopicInfoExtended( + final TopicDescription topicDescription, + final Map> topicConsumersAndGroupCount + ) { + + final List consumerAndGroupCount = topicConsumersAndGroupCount + .getOrDefault(topicDescription.name(), Arrays.asList(0, 0)); + + return new KafkaTopicInfoExtended( + topicDescription.name(), + topicDescription.partitions() + .stream().map(partition -> partition.replicas().size()).collect(Collectors.toList()), + consumerAndGroupCount.get(0), + consumerAndGroupCount.get(1)); + } + + private static Map filterKsqlInternalTopics( + final Map kafkaTopicDescriptions, + final KsqlConfig ksqlConfig + ) { + final Map filteredKafkaTopics = new HashMap<>(); + final String serviceId = KsqlConstants.KSQL_INTERNAL_TOPIC_PREFIX + + ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG); + final String persistentQueryPrefix = ksqlConfig.getString( + KsqlConfig.KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG); + final String transientQueryPrefix = ksqlConfig.getString( + KsqlConfig.KSQL_TRANSIENT_QUERY_NAME_PREFIX_CONFIG); + + for (final Map.Entry entry : kafkaTopicDescriptions.entrySet()) { + if (!entry.getKey().startsWith(serviceId + persistentQueryPrefix) + && !entry.getKey().startsWith(serviceId + transientQueryPrefix)) { + filteredKafkaTopics.put(entry.getKey().toLowerCase(), entry.getValue()); + } + } + return filteredKafkaTopics; + } + + /** + * @return all topics with their associated consumerCount and consumerGroupCount + */ + private static Map> getTopicConsumerAndGroupCounts( + final KafkaConsumerGroupClient consumerGroupClient + ) { + + final List consumerGroups = consumerGroupClient.listGroups(); + + final Map topicConsumerCount = new HashMap<>(); + final Map> topicConsumerGroupCount = new HashMap<>(); + + for (final String group : consumerGroups) { + final Collection consumerSummaryList = + consumerGroupClient.describeConsumerGroup(group).consumers(); + + for (final KafkaConsumerGroupClientImpl.ConsumerSummary summary : consumerSummaryList) { + + for (final TopicPartition topicPartition : summary.partitions()) { + topicConsumerCount + .computeIfAbsent(topicPartition.topic(), k -> new AtomicInteger()) + .incrementAndGet(); + topicConsumerGroupCount + .computeIfAbsent(topicPartition.topic(), k -> new HashSet<>()).add(group); + } + } + } + final HashMap> results = new HashMap<>(); + topicConsumerCount.forEach( + (k, v) -> { + results.computeIfAbsent(k, v1 -> new ArrayList<>()).add(v.intValue()); + results.get(k).add(topicConsumerGroupCount.get(k).size()); + } + ); + + return results; } } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/entity/KafkaTopicsListExtendedTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/entity/KafkaTopicsListExtendedTest.java new file mode 100644 index 000000000000..462ff08652e7 --- /dev/null +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/entity/KafkaTopicsListExtendedTest.java @@ -0,0 +1,54 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.entity; + +import static org.junit.Assert.assertEquals; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import io.confluent.ksql.json.JsonMapper; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class KafkaTopicsListExtendedTest { + + @Test + public void testSerde() throws Exception { + // Given: + final ObjectMapper mapper = JsonMapper.INSTANCE.mapper; + final KafkaTopicsListExtended expected = new KafkaTopicsListExtended( + "SHOW TOPICS EXTENDED;", + ImmutableList.of(new KafkaTopicInfoExtended("thetopic", ImmutableList.of(1, 2, 3), 42, 12)) + ); + + // When: + final String json = mapper.writeValueAsString(expected); + final KafkaTopicsListExtended actual = mapper.readValue(json, KafkaTopicsListExtended.class); + + // Then: + assertEquals( + "{" + + "\"@type\":\"kafka_topics_extended\"," + + "\"statementText\":\"SHOW TOPICS EXTENDED;\"," + + "\"topics\":[" + + "{\"name\":\"thetopic\",\"replicaInfo\":[1,2,3],\"consumerCount\":42,\"consumerGroupCount\":12}" + + "],\"warnings\":[]}", + json); + assertEquals(expected, actual); + } +} diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/entity/KafkaTopicsListTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/entity/KafkaTopicsListTest.java index a87324e5aba3..477d75aafab0 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/entity/KafkaTopicsListTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/entity/KafkaTopicsListTest.java @@ -15,83 +15,38 @@ package io.confluent.ksql.rest.entity; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.mock; -import static org.easymock.EasyMock.replay; -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import io.confluent.ksql.json.JsonMapper; -import io.confluent.ksql.util.KafkaConsumerGroupClient; -import io.confluent.ksql.util.KafkaConsumerGroupClientImpl; -import io.confluent.ksql.util.KsqlConfig; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import org.apache.kafka.clients.admin.TopicDescription; -import org.apache.kafka.common.Node; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.TopicPartitionInfo; import org.junit.Test; public class KafkaTopicsListTest { - - @Test - public void shouldBuildValidTopicList() { - - // represent the full list of topics - final Map topicDescriptions = new HashMap<>(); - final TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(1, new Node(1, "", 8088), - Collections.emptyList(), Collections.emptyList()); - topicDescriptions.put("test-topic", new TopicDescription("test-topic", false, Collections.singletonList(topicPartitionInfo))); - - - final TopicPartition topicPartition = new TopicPartition("test-topic", 1); - final KafkaConsumerGroupClientImpl.ConsumerSummary consumerSummary = new KafkaConsumerGroupClientImpl.ConsumerSummary("consumer-id"); - consumerSummary.addPartition(topicPartition); - final KafkaConsumerGroupClientImpl.ConsumerGroupSummary consumerGroupSummary - = new KafkaConsumerGroupClientImpl.ConsumerGroupSummary(Collections.singleton(consumerSummary)); - - - - - final KafkaConsumerGroupClient consumerGroupClient = mock(KafkaConsumerGroupClient.class); - expect(consumerGroupClient.listGroups()).andReturn(Collections.singletonList("test-topic")); - expect(consumerGroupClient.describeConsumerGroup("test-topic")).andReturn(consumerGroupSummary); - replay(consumerGroupClient); - - final KafkaTopicsList topicsList = KafkaTopicsList.build("statement test", topicDescriptions, new KsqlConfig(Collections.EMPTY_MAP), consumerGroupClient); - - assertThat(topicsList.getTopics().size(), equalTo(1)); - final KafkaTopicInfo first = topicsList.getTopics().iterator().next(); - assertThat(first.getConsumerGroupCount(), equalTo(1)); - assertThat(first.getConsumerCount(), equalTo(1)); - assertThat(first.getReplicaInfo().size(), equalTo(1)); - - } - @Test public void testSerde() throws Exception { + // Given: final ObjectMapper mapper = JsonMapper.INSTANCE.mapper; final KafkaTopicsList expected = new KafkaTopicsList( "SHOW TOPICS;", - ImmutableList.of(new KafkaTopicInfo("thetopic", ImmutableList.of(1, 2, 3), 42, 12)) + ImmutableList.of(new KafkaTopicInfo("thetopic", ImmutableList.of(1, 2, 3))) ); + + // When: final String json = mapper.writeValueAsString(expected); + final KafkaTopicsList actual = mapper.readValue(json, KafkaTopicsList.class); + + // Then: assertEquals( "{" + "\"@type\":\"kafka_topics\"," + "\"statementText\":\"SHOW TOPICS;\"," + "\"topics\":[" - + "{\"name\":\"thetopic\",\"replicaInfo\":[1,2,3],\"consumerCount\":42,\"consumerGroupCount\":12}" + + "{\"name\":\"thetopic\",\"replicaInfo\":[1,2,3]}" + "],\"warnings\":[]}", json); - final KafkaTopicsList actual = mapper.readValue(json, KafkaTopicsList.class); assertEquals(expected, actual); } } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListTopicsExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListTopicsExecutorTest.java index 318abeb5ea80..1252a8a88346 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListTopicsExecutorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListTopicsExecutorTest.java @@ -22,7 +22,9 @@ import com.google.common.collect.ImmutableList; import io.confluent.ksql.rest.entity.KafkaTopicInfo; +import io.confluent.ksql.rest.entity.KafkaTopicInfoExtended; import io.confluent.ksql.rest.entity.KafkaTopicsList; +import io.confluent.ksql.rest.entity.KafkaTopicsListExtended; import io.confluent.ksql.rest.server.TemporaryEngine; import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.services.TestServiceContext; @@ -47,6 +49,37 @@ public void shouldListKafkaTopics() { engine.givenKafkaTopic("topic1"); engine.givenKafkaTopic("topic2"); + final AdminClient mockAdminClient = mock(AdminClient.class); + + final ServiceContext serviceContext = TestServiceContext.create( + engine.getServiceContext().getKafkaClientSupplier(), + mockAdminClient, + engine.getServiceContext().getTopicClient(), + engine.getServiceContext().getSchemaRegistryClientFactory(), + engine.getServiceContext().getConnectClient() + ); + + // When: + final KafkaTopicsList topicsList = + (KafkaTopicsList) CustomExecutors.LIST_TOPICS.execute( + engine.configure("LIST TOPICS;"), + engine.getEngine(), + serviceContext + ).orElseThrow(IllegalStateException::new); + + // Then: + assertThat(topicsList.getTopics(), containsInAnyOrder( + new KafkaTopicInfo("topic1", ImmutableList.of(1)), + new KafkaTopicInfo("topic2", ImmutableList.of(1)) + )); + } + + @Test + public void shouldListKafkaTopicsExtended() { + // Given: + engine.givenKafkaTopic("topic1"); + engine.givenKafkaTopic("topic2"); + final AdminClient mockAdminClient = mock(AdminClient.class); final ListConsumerGroupsResult result = mock(ListConsumerGroupsResult.class); final KafkaFutureImpl> groups = new KafkaFutureImpl<>(); @@ -64,17 +97,17 @@ public void shouldListKafkaTopics() { ); // When: - final KafkaTopicsList topicsList = - (KafkaTopicsList) CustomExecutors.LIST_TOPICS.execute( - engine.configure("LIST TOPICS;"), + final KafkaTopicsListExtended topicsList = + (KafkaTopicsListExtended) CustomExecutors.LIST_TOPICS.execute( + engine.configure("LIST TOPICS EXTENDED;"), engine.getEngine(), serviceContext ).orElseThrow(IllegalStateException::new); // Then: assertThat(topicsList.getTopics(), containsInAnyOrder( - new KafkaTopicInfo("topic1", ImmutableList.of(1), 0, 0), - new KafkaTopicInfo("topic2", ImmutableList.of(1), 0, 0) + new KafkaTopicInfoExtended("topic1", ImmutableList.of(1), 0, 0), + new KafkaTopicInfoExtended("topic2", ImmutableList.of(1), 0, 0) )); }