diff --git a/ksqldb-cli/src/main/java/io/confluent/ksql/cli/console/Console.java b/ksqldb-cli/src/main/java/io/confluent/ksql/cli/console/Console.java index b6a0fb0b6add..63479e0bb16b 100644 --- a/ksqldb-cli/src/main/java/io/confluent/ksql/cli/console/Console.java +++ b/ksqldb-cli/src/main/java/io/confluent/ksql/cli/console/Console.java @@ -15,12 +15,17 @@ package io.confluent.ksql.cli.console; +import static com.google.common.collect.ImmutableListMultimap.flatteningToImmutableListMultimap; +import static com.google.common.collect.ImmutableListMultimap.toImmutableListMultimap; import static io.confluent.ksql.util.CmdLineUtil.splitByUnquotedWhitespace; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Functions; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableListMultimap; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; +import com.google.common.collect.Streams; import io.confluent.ksql.cli.console.CliConfig.OnOff; import io.confluent.ksql.cli.console.KsqlTerminal.HistoryEntry; import io.confluent.ksql.cli.console.KsqlTerminal.StatusClosable; @@ -44,7 +49,7 @@ import io.confluent.ksql.cli.console.table.builder.TopicDescriptionTableBuilder; import io.confluent.ksql.cli.console.table.builder.TypeListTableBuilder; import io.confluent.ksql.cli.console.table.builder.WarningEntityTableBuilder; -import io.confluent.ksql.metrics.MetricCollectors; +import io.confluent.ksql.metrics.TopicSensors.Stat; import io.confluent.ksql.model.WindowType; import io.confluent.ksql.query.QueryError; import io.confluent.ksql.rest.ApiJsonMapper; @@ -65,6 +70,7 @@ import io.confluent.ksql.rest.entity.KafkaTopicsListExtended; import io.confluent.ksql.rest.entity.KsqlEntity; import io.confluent.ksql.rest.entity.KsqlErrorMessage; +import io.confluent.ksql.rest.entity.KsqlHostInfoEntity; import io.confluent.ksql.rest.entity.KsqlStatementErrorMessage; import io.confluent.ksql.rest.entity.KsqlWarning; import io.confluent.ksql.rest.entity.PropertiesList; @@ -106,6 +112,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -116,6 +123,7 @@ import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.commons.lang3.ObjectUtils; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.common.config.ConfigException; @@ -637,6 +645,80 @@ private void printQueryError(final QueryDescription query) { } } + private void printStatistics(final SourceDescription source) { + final ImmutableListMultimap clusterStats = source + .getClusterStatistics() + .entrySet() + .stream() + .collect(flatteningToImmutableListMultimap( + Entry::getKey, + (e) -> e.getValue().values().stream() + )); + final ImmutableListMultimap clusterErrors = source + .getClusterErrorStats() + .entrySet() + .stream() + .collect(flatteningToImmutableListMultimap( + Entry::getKey, + (e) -> e.getValue().values().stream() + )); + + final ImmutableListMultimap statisticsMap = source.getStatisticsMap() + .values() + .stream() + .collect(toImmutableListMultimap( + Functions.constant(new KsqlHostInfoEntity("", 0)), + Functions.identity() + )); + final ImmutableListMultimap errorStatsMap = source.getErrorStatsMap() + .values() + .stream() + .collect(toImmutableListMultimap( + Functions.constant(new KsqlHostInfoEntity("", 0)), + Functions.identity() + )); + + + if (statisticsMap.isEmpty() && errorStatsMap.isEmpty()) { + writer().println(String.format( + "%n%-20s%n%s", + "Local runtime statistics", + "------------------------" + )); + writer().println(source.getStatistics()); + writer().println(source.getErrorStats()); + return; + } + final boolean printLocalOnly = clusterStats.isEmpty() && clusterErrors.isEmpty(); + final List headers = ImmutableList.of("Host", "Metric", "Value", "Last Message"); + final Stream> rows = printLocalOnly + ? Streams.concat(statisticsMap.entries().stream(), errorStatsMap.entries().stream()) + : Streams.concat(clusterStats.entries().stream(), clusterErrors.entries().stream()); + + + writer().println(String.format( + "%n%-20s%n%s", + printLocalOnly ? "Local runtime statistics" : "Runtime statistics by host", + "-------------------------" + )); + final Table statsTable = new Table.Builder() + .withColumnHeaders(headers) + .withRows(rows + .sorted(Comparator + .comparing((Map.Entry e) -> e.getKey().toString()) + .thenComparing((Map.Entry e) -> e.getValue().name()) + ) + .map((e) -> { + final KsqlHostInfoEntity host = e.getKey(); + final Stat metric = e.getValue(); + final String hostCell = host.getHost().equals("") ? "--" : host.toString(); + final String formattedValue = String.format("%10.0f", metric.getValue()); + return ImmutableList.of(hostCell, metric.name(), formattedValue, metric.timestamp()); + })) + .build(); + statsTable.print(this); + } + private void printSourceDescription(final SourceDescription source) { final boolean isTable = source.getType().equalsIgnoreCase("TABLE"); @@ -660,14 +742,8 @@ private void printSourceDescription(final SourceDescription source) { printQueries(source.getReadQueries(), source.getType(), "read"); printQueries(source.getWriteQueries(), source.getType(), "write"); + printStatistics(source); - writer().println(String.format( - "%n%-20s%n%s", - "Local runtime statistics", - "------------------------" - )); - writer().println(MetricCollectors.format(source.getStatisticsMap().values(), "last-message")); - writer().println(MetricCollectors.format(source.getErrorStatsMap().values(), "last-message")); writer().println(String.format( "(%s)", "Statistics of the local KSQL server interaction with the Kafka topic " @@ -946,6 +1022,7 @@ private static final class CliCmdExecutor { private final CliSpecificCommand cmd; private final List args; + private static CliCmdExecutor of(final CliSpecificCommand cmd, final List lineParts) { final String[] nameParts = cmd.getName().split("\\s+"); final List argList = lineParts.subList(nameParts.length, lineParts.size()).stream() diff --git a/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.java b/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.java index c5bfe32e9e1a..dead641a1c65 100644 --- a/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.java +++ b/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.java @@ -15,6 +15,7 @@ package io.confluent.ksql.cli.console; +import static com.google.common.collect.ImmutableMap.toImmutableMap; import static io.confluent.ksql.GenericRow.genericRow; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; @@ -26,7 +27,9 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; + import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Functions; import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -101,6 +104,7 @@ import java.util.Optional; import java.util.TimeZone; import java.util.function.Supplier; +import java.util.stream.IntStream; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo.ConnectorState; @@ -127,14 +131,14 @@ public class ConsoleTest { private static final String NEWLINE = System.lineSeparator(); private static final String STATUS_COUNT_STRING = "RUNNING:1,ERROR:2"; private static final String AGGREGATE_STATUS = "ERROR"; - protected static final Stat STAT = new Stat("TEST", 0, 0); - protected static final ImmutableMap IMMUTABLE_MAP = new ImmutableMap.Builder().put("TEST", STAT).build(); - + protected static final Stat STAT = new Stat("TEST", 0, 1596644936314L); + protected static final Stat ERROR_STAT = new Stat("ERROR", 0, 1596644936314L); + protected static final ImmutableMap NODE_STATS = new ImmutableMap.Builder().put("TEST", STAT).build(); + protected static final ImmutableMap NODE_ERRORS = new ImmutableMap.Builder().put("ERROR", ERROR_STAT).build(); private static final LogicalSchema SCHEMA = LogicalSchema.builder() .keyColumn(ColumnName.of("foo"), SqlTypes.INTEGER) .valueColumn(ColumnName.of("bar"), SqlTypes.STRING) .build().withPseudoAndKeyColsInValue(false); - private final TestTerminal terminal; private final Console console; private final Supplier lineSupplier; @@ -150,8 +154,8 @@ public class ConsoleTest { "2000-01-01", "stats", "errors", - IMMUTABLE_MAP, - IMMUTABLE_MAP, + NODE_STATS, + NODE_ERRORS, true, "kafka", "avro", @@ -186,6 +190,7 @@ public ConsoleTest(final OutputFormat outputFormat) { } + @Before public void setUp() { MockitoAnnotations.initMocks(this); @@ -298,7 +303,7 @@ public void testPrintPropertyList() { final KsqlEntityList entityList = new KsqlEntityList(ImmutableList.of( new PropertiesList("e", properties, Collections.emptyList(), Collections.emptyList()) - )); + )); // When: console.printKsqlEntityList(entityList); @@ -307,6 +312,7 @@ public void testPrintPropertyList() { final String output = terminal.getOutputString(); Approvals.verify(output, approvalOptions); } + @Test public void testPrintQueries() { // Given: @@ -371,6 +377,48 @@ public void shouldPrintExplainQueryWithError() { Approvals.verify(output, approvalOptions); } + private SourceDescription buildSourceDescription( + final List readQueries, + final List writeQueries, + final List fields, + final boolean withClusterStats) { + final ImmutableMap> statistics = IntStream.range(1, 5) + .boxed() + .collect(toImmutableMap( + (Integer i) -> new KsqlHostInfoEntity("host" + i, 8000 + i), + Functions.constant(NODE_STATS))); + final ImmutableMap> errors = IntStream.range(1, 5) + .boxed() + .collect(toImmutableMap( + (Integer i) -> new KsqlHostInfoEntity("host" + i, 8000 + i), + Functions.constant(NODE_ERRORS))); + + return new SourceDescription( + "TestSource", + Optional.empty(), + readQueries, + writeQueries, + fields, + DataSourceType.KTABLE.getKsqlType(), + "2000-01-01", + "stats", + "errors", + NODE_STATS, + NODE_ERRORS, + true, + "kafka", + "avro", + "kafka-topic", + 1, + 1, + "sql statement", + Collections.emptyList(), + Collections.emptyList(), + withClusterStats ? statistics : ImmutableMap.of(), + withClusterStats ? errors : ImmutableMap.of() + ); + } + @Test public void testPrintSourceDescription() { // Given: @@ -397,27 +445,46 @@ public void testPrintSourceDescription() { final KsqlEntityList entityList = new KsqlEntityList(ImmutableList.of( new SourceDescriptionEntity( "some sql", - new SourceDescription( - "TestSource", - Optional.empty(), - readQueries, - writeQueries, - fields, - DataSourceType.KTABLE.getKsqlType(), - "2000-01-01", - "stats", - "errors", - IMMUTABLE_MAP, - IMMUTABLE_MAP, - false, - "kafka", - "avro", - "kafka-topic", - 1, - 1, - "sql statement", - Collections.emptyList(), - Collections.emptyList()), + buildSourceDescription( readQueries, writeQueries,fields, false), + Collections.emptyList() + ) + )); + + // When: + console.printKsqlEntityList(entityList); + + // Then: + final String output = terminal.getOutputString(); + Approvals.verify(output, approvalOptions); + } + + @Test + public void testPrintSourceDescriptionWithClusterStats() { + // Given: + final List fields = buildTestSchema( + SqlTypes.BOOLEAN, + SqlTypes.INTEGER, + SqlTypes.BIGINT, + SqlTypes.DOUBLE, + SqlTypes.STRING, + SqlTypes.array(SqlTypes.STRING), + SqlTypes.map(SqlTypes.STRING, SqlTypes.BIGINT), + SqlTypes.struct() + .field("a", SqlTypes.DOUBLE) + .build() + ); + + final List readQueries = ImmutableList.of( + new RunningQuery("read query", ImmutableSet.of("sink1"), ImmutableSet.of("sink1 topic"), new QueryId("readId"), queryStatusCount, KsqlConstants.KsqlQueryType.PERSISTENT) + ); + final List writeQueries = ImmutableList.of( + new RunningQuery("write query", ImmutableSet.of("sink2"), ImmutableSet.of("sink2 topic"), new QueryId("writeId"), queryStatusCount, KsqlConstants.KsqlQueryType.PERSISTENT) + ); + + final KsqlEntityList entityList = new KsqlEntityList(ImmutableList.of( + new SourceDescriptionEntity( + "some sql", + buildSourceDescription(readQueries, writeQueries,fields, true), Collections.emptyList() ) )); @@ -519,7 +586,7 @@ public void shouldPrintConnectorsList() { ImmutableList.of( new SimpleConnectorInfo("foo", ConnectorType.SOURCE, "clazz", "STATUS"), new SimpleConnectorInfo("bar", null, null, null) - )) + )) )); // When: @@ -545,11 +612,11 @@ public void shouldPrintTypesList() { new FieldInfo("f1", new SchemaInfo(SqlBaseType.STRING, null, null), Optional.empty())), null), "typeC", new SchemaInfo( - SqlBaseType.DECIMAL, - null, - null, - ImmutableMap.of("precision", 10, "scale", 9) - ) + SqlBaseType.DECIMAL, + null, + null, + ImmutableMap.of("precision", 10, "scale", 9) + ) )) )); @@ -599,8 +666,8 @@ public void shouldPrintTopicDescribeExtended() { "2000-01-01", "stats", "errors", - IMMUTABLE_MAP, - IMMUTABLE_MAP, + NODE_STATS, + NODE_ERRORS, true, "json", "avro", @@ -805,7 +872,7 @@ public void shouldSupportOtherWhitespaceBetweenCliCommandAndArgs() { public void shouldSupportCmdBeingTerminatedWithSemiColon() { // Given: when(lineSupplier.get()) - .thenReturn(CLI_CMD_NAME + WHITE_SPACE + "Arg0;") + .thenReturn(CLI_CMD_NAME + WHITE_SPACE + "Arg0;") .thenReturn("not a CLI command;"); // When: @@ -833,7 +900,7 @@ public void shouldSupportCmdBeingTerminatedWithSemiColonAndWhitespace() { public void shouldSupportCmdWithQuotedArgBeingTerminatedWithSemiColon() { // Given: when(lineSupplier.get()) - .thenReturn(CLI_CMD_NAME + WHITE_SPACE + "'Arg0';") + .thenReturn(CLI_CMD_NAME + WHITE_SPACE + "'Arg0';") .thenReturn("not a CLI command;"); // When: diff --git a/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.shouldPrintTopicDescribeExtended.approved.json b/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.shouldPrintTopicDescribeExtended.approved.json index 55e1129388a7..e7378e1a3b0a 100644 --- a/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.shouldPrintTopicDescribeExtended.approved.json +++ b/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.shouldPrintTopicDescribeExtended.approved.json @@ -52,14 +52,14 @@ "TEST" : { "name" : "TEST", "value" : 0.0, - "timestamp" : 0 + "timestamp" : 1596644936314 } }, "errorStatsMap" : { - "TEST" : { - "name" : "TEST", + "ERROR" : { + "name" : "ERROR", "value" : 0.0, - "timestamp" : 0 + "timestamp" : 1596644936314 } }, "extended" : true, diff --git a/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.shouldPrintTopicDescribeExtended.approved.tabular b/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.shouldPrintTopicDescribeExtended.approved.tabular index 3f6784c22bf7..bbf64497d355 100644 --- a/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.shouldPrintTopicDescribeExtended.approved.tabular +++ b/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.shouldPrintTopicDescribeExtended.approved.tabular @@ -31,9 +31,12 @@ writeId (ERROR) : write query For query topology and execution plan please run: EXPLAIN Local runtime statistics ------------------------- - TEST: 0 last-message: n/a - TEST: 0 last-message: n/a +------------------------- + Host | Metric | Value | Last Message +------------------------------------------------------- + -- | ERROR | 0 | 2020-08-05T16:28:56.314Z + -- | TEST | 0 | 2020-08-05T16:28:56.314Z +------------------------------------------------------- (Statistics of the local KSQL server interaction with the Kafka topic kafka-topic) Consumer Groups summary: diff --git a/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.shouldPrintWarnings.approved.json b/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.shouldPrintWarnings.approved.json index 62e0831855c1..882336634ba2 100644 --- a/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.shouldPrintWarnings.approved.json +++ b/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.shouldPrintWarnings.approved.json @@ -36,14 +36,14 @@ "TEST" : { "name" : "TEST", "value" : 0.0, - "timestamp" : 0 + "timestamp" : 1596644936314 } }, "errorStatsMap" : { - "TEST" : { - "name" : "TEST", + "ERROR" : { + "name" : "ERROR", "value" : 0.0, - "timestamp" : 0 + "timestamp" : 1596644936314 } }, "extended" : true, diff --git a/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.shouldPrintWarnings.approved.tabular b/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.shouldPrintWarnings.approved.tabular index 8e47ee15e8f9..0482a88636c0 100644 --- a/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.shouldPrintWarnings.approved.tabular +++ b/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.shouldPrintWarnings.approved.tabular @@ -15,9 +15,12 @@ Statement : statement ----------------------------------------- Local runtime statistics ------------------------- - TEST: 0 last-message: n/a - TEST: 0 last-message: n/a +------------------------- + Host | Metric | Value | Last Message +------------------------------------------------------- + -- | ERROR | 0 | 2020-08-05T16:28:56.314Z + -- | TEST | 0 | 2020-08-05T16:28:56.314Z +------------------------------------------------------- (Statistics of the local KSQL server interaction with the Kafka topic kafka-topic) WARNING: oops WARNING: doh! diff --git a/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.testPrintConnectorDescription.approved.json b/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.testPrintConnectorDescription.approved.json index e29cecaf19c4..37ee29fe35d4 100644 --- a/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.testPrintConnectorDescription.approved.json +++ b/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.testPrintConnectorDescription.approved.json @@ -53,14 +53,14 @@ "TEST" : { "name" : "TEST", "value" : 0.0, - "timestamp" : 0 + "timestamp" : 1596644936314 } }, "errorStatsMap" : { - "TEST" : { - "name" : "TEST", + "ERROR" : { + "name" : "ERROR", "value" : 0.0, - "timestamp" : 0 + "timestamp" : 1596644936314 } }, "extended" : true, diff --git a/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.testPrintSourceDescription.approved.json b/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.testPrintSourceDescription.approved.json index 1bf969e9403c..228c21c4824d 100644 --- a/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.testPrintSourceDescription.approved.json +++ b/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.testPrintSourceDescription.approved.json @@ -116,17 +116,17 @@ "TEST" : { "name" : "TEST", "value" : 0.0, - "timestamp" : 0 + "timestamp" : 1596644936314 } }, "errorStatsMap" : { - "TEST" : { - "name" : "TEST", + "ERROR" : { + "name" : "ERROR", "value" : 0.0, - "timestamp" : 0 + "timestamp" : 1596644936314 } }, - "extended" : false, + "extended" : true, "keyFormat" : "kafka", "valueFormat" : "avro", "topic" : "kafka-topic", diff --git a/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.testPrintSourceDescription.approved.tabular b/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.testPrintSourceDescription.approved.tabular index 6abe2f7a9d89..352313ac663c 100644 --- a/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.testPrintSourceDescription.approved.tabular +++ b/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.testPrintSourceDescription.approved.tabular @@ -1,5 +1,12 @@ Name : TestSource +Type : TABLE +Timestamp field : 2000-01-01 +Key format : kafka +Value format : avro +Kafka topic : kafka-topic (partitions: 1, replication: 1) +Statement : sql statement + Field | Type ----------------------------------------- ROWKEY | VARCHAR(STRING) (primary key) @@ -12,4 +19,24 @@ Name : TestSource f_6 | MAP f_7 | STRUCT ----------------------------------------- -For runtime statistics and query details run: DESCRIBE EXTENDED; + +Queries that read from this TABLE +----------------------------------- +readId (ERROR) : read query + +For query topology and execution plan please run: EXPLAIN + +Queries that write from this TABLE +----------------------------------- +writeId (ERROR) : write query + +For query topology and execution plan please run: EXPLAIN + +Local runtime statistics +------------------------- + Host | Metric | Value | Last Message +------------------------------------------------------- + -- | ERROR | 0 | 2020-08-05T16:28:56.314Z + -- | TEST | 0 | 2020-08-05T16:28:56.314Z +------------------------------------------------------- +(Statistics of the local KSQL server interaction with the Kafka topic kafka-topic) diff --git a/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.testPrintSourceDescriptionWithClusterStats.approved.json b/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.testPrintSourceDescriptionWithClusterStats.approved.json new file mode 100644 index 000000000000..947b8be2cfd6 --- /dev/null +++ b/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.testPrintSourceDescriptionWithClusterStats.approved.json @@ -0,0 +1,200 @@ +[ { + "@type" : "sourceDescription", + "statementText" : "some sql", + "sourceDescription" : { + "name" : "TestSource", + "windowType" : null, + "readQueries" : [ { + "queryString" : "read query", + "sinks" : [ "sink1" ], + "sinkKafkaTopics" : [ "sink1 topic" ], + "id" : "readId", + "statusCount" : { + "RUNNING" : 1, + "ERROR" : 2 + }, + "queryType" : "PERSISTENT", + "state" : "ERROR" + } ], + "writeQueries" : [ { + "queryString" : "write query", + "sinks" : [ "sink2" ], + "sinkKafkaTopics" : [ "sink2 topic" ], + "id" : "writeId", + "statusCount" : { + "RUNNING" : 1, + "ERROR" : 2 + }, + "queryType" : "PERSISTENT", + "state" : "ERROR" + } ], + "fields" : [ { + "name" : "ROWKEY", + "schema" : { + "type" : "STRING", + "fields" : null, + "memberSchema" : null + }, + "type" : "KEY" + }, { + "name" : "f_0", + "schema" : { + "type" : "BOOLEAN", + "fields" : null, + "memberSchema" : null + } + }, { + "name" : "f_1", + "schema" : { + "type" : "INTEGER", + "fields" : null, + "memberSchema" : null + } + }, { + "name" : "f_2", + "schema" : { + "type" : "BIGINT", + "fields" : null, + "memberSchema" : null + } + }, { + "name" : "f_3", + "schema" : { + "type" : "DOUBLE", + "fields" : null, + "memberSchema" : null + } + }, { + "name" : "f_4", + "schema" : { + "type" : "STRING", + "fields" : null, + "memberSchema" : null + } + }, { + "name" : "f_5", + "schema" : { + "type" : "ARRAY", + "fields" : null, + "memberSchema" : { + "type" : "STRING", + "fields" : null, + "memberSchema" : null + } + } + }, { + "name" : "f_6", + "schema" : { + "type" : "MAP", + "fields" : null, + "memberSchema" : { + "type" : "BIGINT", + "fields" : null, + "memberSchema" : null + } + } + }, { + "name" : "f_7", + "schema" : { + "type" : "STRUCT", + "fields" : [ { + "name" : "a", + "schema" : { + "type" : "DOUBLE", + "fields" : null, + "memberSchema" : null + } + } ], + "memberSchema" : null + } + } ], + "type" : "TABLE", + "timestamp" : "2000-01-01", + "statistics" : "The statistics field is deprecated and will be removed in a future version of ksql. Please update your client to the latest version and use statisticsMap instead.\nstats", + "errorStats" : "The errorStats field is deprecated and will be removed in a future version of ksql. Please update your client to the latest version and use errorStatsMap instead.\nerrors\n", + "statisticsMap" : { + "TEST" : { + "name" : "TEST", + "value" : 0.0, + "timestamp" : 1596644936314 + } + }, + "errorStatsMap" : { + "ERROR" : { + "name" : "ERROR", + "value" : 0.0, + "timestamp" : 1596644936314 + } + }, + "extended" : true, + "keyFormat" : "kafka", + "valueFormat" : "avro", + "topic" : "kafka-topic", + "partitions" : 1, + "replication" : 1, + "statement" : "sql statement", + "queryOffsetSummaries" : [ ], + "sourceConstraints" : [ ], + "clusterStatistics" : { + "host1:8001" : { + "TEST" : { + "name" : "TEST", + "value" : 0.0, + "timestamp" : 1596644936314 + } + }, + "host2:8002" : { + "TEST" : { + "name" : "TEST", + "value" : 0.0, + "timestamp" : 1596644936314 + } + }, + "host3:8003" : { + "TEST" : { + "name" : "TEST", + "value" : 0.0, + "timestamp" : 1596644936314 + } + }, + "host4:8004" : { + "TEST" : { + "name" : "TEST", + "value" : 0.0, + "timestamp" : 1596644936314 + } + } + }, + "clusterErrorStats" : { + "host1:8001" : { + "ERROR" : { + "name" : "ERROR", + "value" : 0.0, + "timestamp" : 1596644936314 + } + }, + "host2:8002" : { + "ERROR" : { + "name" : "ERROR", + "value" : 0.0, + "timestamp" : 1596644936314 + } + }, + "host3:8003" : { + "ERROR" : { + "name" : "ERROR", + "value" : 0.0, + "timestamp" : 1596644936314 + } + }, + "host4:8004" : { + "ERROR" : { + "name" : "ERROR", + "value" : 0.0, + "timestamp" : 1596644936314 + } + } + } + }, + "warnings" : [ ] +} ] diff --git a/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.testPrintSourceDescriptionWithClusterStats.approved.tabular b/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.testPrintSourceDescriptionWithClusterStats.approved.tabular new file mode 100644 index 000000000000..14e79821792a --- /dev/null +++ b/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.testPrintSourceDescriptionWithClusterStats.approved.tabular @@ -0,0 +1,48 @@ + +Name : TestSource +Type : TABLE +Timestamp field : 2000-01-01 +Key format : kafka +Value format : avro +Kafka topic : kafka-topic (partitions: 1, replication: 1) +Statement : sql statement + + Field | Type +----------------------------------------- + ROWKEY | VARCHAR(STRING) (primary key) + f_0 | BOOLEAN + f_1 | INTEGER + f_2 | BIGINT + f_3 | DOUBLE + f_4 | VARCHAR(STRING) + f_5 | ARRAY + f_6 | MAP + f_7 | STRUCT +----------------------------------------- + +Queries that read from this TABLE +----------------------------------- +readId (ERROR) : read query + +For query topology and execution plan please run: EXPLAIN + +Queries that write from this TABLE +----------------------------------- +writeId (ERROR) : write query + +For query topology and execution plan please run: EXPLAIN + +Runtime statistics by host +------------------------- + Host | Metric | Value | Last Message +------------------------------------------------------------- + host1:8001 | ERROR | 0 | 2020-08-05T16:28:56.314Z + host1:8001 | TEST | 0 | 2020-08-05T16:28:56.314Z + host2:8002 | ERROR | 0 | 2020-08-05T16:28:56.314Z + host2:8002 | TEST | 0 | 2020-08-05T16:28:56.314Z + host3:8003 | ERROR | 0 | 2020-08-05T16:28:56.314Z + host3:8003 | TEST | 0 | 2020-08-05T16:28:56.314Z + host4:8004 | ERROR | 0 | 2020-08-05T16:28:56.314Z + host4:8004 | TEST | 0 | 2020-08-05T16:28:56.314Z +------------------------------------------------------------- +(Statistics of the local KSQL server interaction with the Kafka topic kafka-topic)