diff --git a/ksqldb-cli/src/main/java/io/confluent/ksql/cli/console/Console.java b/ksqldb-cli/src/main/java/io/confluent/ksql/cli/console/Console.java index 39d613c76a95..0b0348647601 100644 --- a/ksqldb-cli/src/main/java/io/confluent/ksql/cli/console/Console.java +++ b/ksqldb-cli/src/main/java/io/confluent/ksql/cli/console/Console.java @@ -503,6 +503,13 @@ private static String formatFieldType( ) { final FieldType possibleFieldType = field.getType().orElse(null); + if (possibleFieldType == FieldType.HEADER) { + final String headerType = field.getHeaderKey() + .map(k -> "(header('" + k + "'))") + .orElse("(headers)"); + return String.format("%-16s %s", field.getSchema().toTypeString(), headerType); + } + if (possibleFieldType == FieldType.KEY) { final String wt = windowType .map(v -> " (Window type: " + v + ")") diff --git a/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.java b/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.java index 8b326b22a119..3461fd63090d 100644 --- a/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.java +++ b/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.java @@ -187,7 +187,21 @@ public static Collection data() { return ImmutableList.of(OutputFormat.JSON, OutputFormat.TABULAR); } + private static List buildTestSchema(final Optional headerKey, final SqlType... fieldTypes) { + final LogicalSchema schema = builderWithoutHeaders(fieldTypes) + .headerColumn(ColumnName.of("HEAD"), headerKey) + .build(); + + return EntityUtil.buildSourceSchemaEntity(schema); + } + private static List buildTestSchema(final SqlType... fieldTypes) { + final LogicalSchema schema = builderWithoutHeaders(fieldTypes).build(); + + return EntityUtil.buildSourceSchemaEntity(schema); + } + + private static Builder builderWithoutHeaders(final SqlType... fieldTypes) { final Builder schemaBuilder = LogicalSchema.builder() .keyColumn(SystemColumns.ROWKEY_NAME, SqlTypes.STRING); @@ -195,9 +209,7 @@ private static List buildTestSchema(final SqlType... fieldTypes) { schemaBuilder.valueColumn(ColumnName.of("f_" + idx), fieldTypes[idx]); } - final LogicalSchema schema = schemaBuilder.build(); - - return EntityUtil.buildSourceSchemaEntity(schema); + return schemaBuilder; } @Before @@ -468,6 +480,86 @@ public void testPrintSourceDescription() { Approvals.verify(output, approvalOptions); } + @Test + public void testPrintSourceDescriptionWithHeaders() { + // Given: + final List fields = buildTestSchema( + Optional.empty(), + SqlTypes.BOOLEAN, + SqlTypes.INTEGER, + SqlTypes.BIGINT, + SqlTypes.DOUBLE, + SqlTypes.STRING, + SqlTypes.array(SqlTypes.STRING), + SqlTypes.map(SqlTypes.STRING, SqlTypes.BIGINT), + SqlTypes.struct() + .field("a", SqlTypes.DOUBLE) + .build() + ); + + final List readQueries = ImmutableList.of( + new RunningQuery("read query", ImmutableSet.of("sink1"), ImmutableSet.of("sink1 topic"), new QueryId("readId"), queryStatusCount, KsqlConstants.KsqlQueryType.PERSISTENT) + ); + final List writeQueries = ImmutableList.of( + new RunningQuery("write query", ImmutableSet.of("sink2"), ImmutableSet.of("sink2 topic"), new QueryId("writeId"), queryStatusCount, KsqlConstants.KsqlQueryType.PERSISTENT) + ); + + final KsqlEntityList entityList = new KsqlEntityList(ImmutableList.of( + new SourceDescriptionEntity( + "some sql", + buildSourceDescription(readQueries, writeQueries, fields, false), + Collections.emptyList() + ) + )); + + // When: + console.printKsqlEntityList(entityList); + + // Then: + final String output = terminal.getOutputString(); + Approvals.verify(output, approvalOptions); + } + + @Test + public void testPrintSourceDescriptionWithExtractedHeader() { + // Given: + final List fields = buildTestSchema( + Optional.of("abc"), + SqlTypes.BOOLEAN, + SqlTypes.INTEGER, + SqlTypes.BIGINT, + SqlTypes.DOUBLE, + SqlTypes.STRING, + SqlTypes.array(SqlTypes.STRING), + SqlTypes.map(SqlTypes.STRING, SqlTypes.BIGINT), + SqlTypes.struct() + .field("a", SqlTypes.DOUBLE) + .build() + ); + + final List readQueries = ImmutableList.of( + new RunningQuery("read query", ImmutableSet.of("sink1"), ImmutableSet.of("sink1 topic"), new QueryId("readId"), queryStatusCount, KsqlConstants.KsqlQueryType.PERSISTENT) + ); + final List writeQueries = ImmutableList.of( + new RunningQuery("write query", ImmutableSet.of("sink2"), ImmutableSet.of("sink2 topic"), new QueryId("writeId"), queryStatusCount, KsqlConstants.KsqlQueryType.PERSISTENT) + ); + + final KsqlEntityList entityList = new KsqlEntityList(ImmutableList.of( + new SourceDescriptionEntity( + "some sql", + buildSourceDescription(readQueries, writeQueries, fields, false), + Collections.emptyList() + ) + )); + + // When: + console.printKsqlEntityList(entityList); + + // Then: + final String output = terminal.getOutputString(); + Approvals.verify(output, approvalOptions); + } + @Test public void testPrintSourceDescriptionWithClusterStats() { // Given: diff --git a/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.testPrintSourceDescriptionWithExtractedHeader.approved.json b/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.testPrintSourceDescriptionWithExtractedHeader.approved.json new file mode 100644 index 000000000000..d6fb7f1a32a6 --- /dev/null +++ b/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.testPrintSourceDescriptionWithExtractedHeader.approved.json @@ -0,0 +1,137 @@ +[ { + "@type" : "sourceDescription", + "statementText" : "some sql", + "sourceDescription" : { + "name" : "TestSource", + "windowType" : null, + "readQueries" : [ { + "queryString" : "read query", + "sinks" : [ "sink1" ], + "sinkKafkaTopics" : [ "sink1 topic" ], + "id" : "readId", + "statusCount" : { + "RUNNING" : 1, + "ERROR" : 2 + }, + "queryType" : "PERSISTENT", + "state" : "ERROR" + } ], + "writeQueries" : [ { + "queryString" : "write query", + "sinks" : [ "sink2" ], + "sinkKafkaTopics" : [ "sink2 topic" ], + "id" : "writeId", + "statusCount" : { + "RUNNING" : 1, + "ERROR" : 2 + }, + "queryType" : "PERSISTENT", + "state" : "ERROR" + } ], + "fields" : [ { + "name" : "ROWKEY", + "schema" : { + "type" : "STRING", + "fields" : null, + "memberSchema" : null + }, + "type" : "KEY" + }, { + "name" : "f_0", + "schema" : { + "type" : "BOOLEAN", + "fields" : null, + "memberSchema" : null + } + }, { + "name" : "f_1", + "schema" : { + "type" : "INTEGER", + "fields" : null, + "memberSchema" : null + } + }, { + "name" : "f_2", + "schema" : { + "type" : "BIGINT", + "fields" : null, + "memberSchema" : null + } + }, { + "name" : "f_3", + "schema" : { + "type" : "DOUBLE", + "fields" : null, + "memberSchema" : null + } + }, { + "name" : "f_4", + "schema" : { + "type" : "STRING", + "fields" : null, + "memberSchema" : null + } + }, { + "name" : "f_5", + "schema" : { + "type" : "ARRAY", + "fields" : null, + "memberSchema" : { + "type" : "STRING", + "fields" : null, + "memberSchema" : null + } + } + }, { + "name" : "f_6", + "schema" : { + "type" : "MAP", + "fields" : null, + "memberSchema" : { + "type" : "BIGINT", + "fields" : null, + "memberSchema" : null + } + } + }, { + "name" : "f_7", + "schema" : { + "type" : "STRUCT", + "fields" : [ { + "name" : "a", + "schema" : { + "type" : "DOUBLE", + "fields" : null, + "memberSchema" : null + } + } ], + "memberSchema" : null + } + }, { + "name" : "HEAD", + "schema" : { + "type" : "BYTES", + "fields" : null, + "memberSchema" : null + }, + "headerKey" : "abc", + "type" : "HEADER" + } ], + "type" : "TABLE", + "timestamp" : "2000-01-01", + "statistics" : "The statistics field is deprecated and will be removed in a future version of ksql. Please update your client to the latest version and use statisticsMap instead.\nstats", + "errorStats" : "The errorStats field is deprecated and will be removed in a future version of ksql. Please update your client to the latest version and use errorStatsMap instead.\nerrors\n", + "extended" : true, + "keyFormat" : "kafka", + "valueFormat" : "avro", + "topic" : "kafka-topic", + "partitions" : 1, + "replication" : 1, + "statement" : "sql statement", + "queryOffsetSummaries" : [ ], + "sourceConstraints" : [ ], + "clusterStatistics" : [ ], + "clusterErrorStats" : [ ] + }, + "warnings" : [ ] +} ] diff --git a/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.testPrintSourceDescriptionWithExtractedHeader.approved.tabular b/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.testPrintSourceDescriptionWithExtractedHeader.approved.tabular new file mode 100644 index 000000000000..914f5df306ad --- /dev/null +++ b/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.testPrintSourceDescriptionWithExtractedHeader.approved.tabular @@ -0,0 +1,43 @@ + +Name : TestSource +Type : TABLE +Timestamp field : 2000-01-01 +Key format : kafka +Value format : avro +Kafka topic : kafka-topic (partitions: 1, replication: 1) +Statement : sql statement + + Field | Type +------------------------------------------- + ROWKEY | VARCHAR(STRING) (primary key) + f_0 | BOOLEAN + f_1 | INTEGER + f_2 | BIGINT + f_3 | DOUBLE + f_4 | VARCHAR(STRING) + f_5 | ARRAY + f_6 | MAP + f_7 | STRUCT + HEAD | BYTES (header('abc')) +------------------------------------------- + +Queries that read from this TABLE +----------------------------------- +readId (ERROR) : read query + +For query topology and execution plan please run: EXPLAIN + +Queries that write from this TABLE +----------------------------------- +writeId (ERROR) : write query + +For query topology and execution plan please run: EXPLAIN + +Local runtime statistics +------------------------ +The statistics field is deprecated and will be removed in a future version of ksql. Please update your client to the latest version and use statisticsMap instead. +stats +The errorStats field is deprecated and will be removed in a future version of ksql. Please update your client to the latest version and use errorStatsMap instead. +errors + +(Statistics of the local KSQL server interaction with the Kafka topic kafka-topic) diff --git a/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.testPrintSourceDescriptionWithHeaders.approved.json b/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.testPrintSourceDescriptionWithHeaders.approved.json new file mode 100644 index 000000000000..c4aecfd2c0be --- /dev/null +++ b/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.testPrintSourceDescriptionWithHeaders.approved.json @@ -0,0 +1,154 @@ +[ { + "@type" : "sourceDescription", + "statementText" : "some sql", + "sourceDescription" : { + "name" : "TestSource", + "windowType" : null, + "readQueries" : [ { + "queryString" : "read query", + "sinks" : [ "sink1" ], + "sinkKafkaTopics" : [ "sink1 topic" ], + "id" : "readId", + "statusCount" : { + "RUNNING" : 1, + "ERROR" : 2 + }, + "queryType" : "PERSISTENT", + "state" : "ERROR" + } ], + "writeQueries" : [ { + "queryString" : "write query", + "sinks" : [ "sink2" ], + "sinkKafkaTopics" : [ "sink2 topic" ], + "id" : "writeId", + "statusCount" : { + "RUNNING" : 1, + "ERROR" : 2 + }, + "queryType" : "PERSISTENT", + "state" : "ERROR" + } ], + "fields" : [ { + "name" : "ROWKEY", + "schema" : { + "type" : "STRING", + "fields" : null, + "memberSchema" : null + }, + "type" : "KEY" + }, { + "name" : "f_0", + "schema" : { + "type" : "BOOLEAN", + "fields" : null, + "memberSchema" : null + } + }, { + "name" : "f_1", + "schema" : { + "type" : "INTEGER", + "fields" : null, + "memberSchema" : null + } + }, { + "name" : "f_2", + "schema" : { + "type" : "BIGINT", + "fields" : null, + "memberSchema" : null + } + }, { + "name" : "f_3", + "schema" : { + "type" : "DOUBLE", + "fields" : null, + "memberSchema" : null + } + }, { + "name" : "f_4", + "schema" : { + "type" : "STRING", + "fields" : null, + "memberSchema" : null + } + }, { + "name" : "f_5", + "schema" : { + "type" : "ARRAY", + "fields" : null, + "memberSchema" : { + "type" : "STRING", + "fields" : null, + "memberSchema" : null + } + } + }, { + "name" : "f_6", + "schema" : { + "type" : "MAP", + "fields" : null, + "memberSchema" : { + "type" : "BIGINT", + "fields" : null, + "memberSchema" : null + } + } + }, { + "name" : "f_7", + "schema" : { + "type" : "STRUCT", + "fields" : [ { + "name" : "a", + "schema" : { + "type" : "DOUBLE", + "fields" : null, + "memberSchema" : null + } + } ], + "memberSchema" : null + } + }, { + "name" : "HEAD", + "schema" : { + "type" : "ARRAY", + "fields" : null, + "memberSchema" : { + "type" : "STRUCT", + "fields" : [ { + "name" : "KEY", + "schema" : { + "type" : "STRING", + "fields" : null, + "memberSchema" : null + } + }, { + "name" : "VALUE", + "schema" : { + "type" : "BYTES", + "fields" : null, + "memberSchema" : null + } + } ], + "memberSchema" : null + } + }, + "type" : "HEADER" + } ], + "type" : "TABLE", + "timestamp" : "2000-01-01", + "statistics" : "The statistics field is deprecated and will be removed in a future version of ksql. Please update your client to the latest version and use statisticsMap instead.\nstats", + "errorStats" : "The errorStats field is deprecated and will be removed in a future version of ksql. Please update your client to the latest version and use errorStatsMap instead.\nerrors\n", + "extended" : true, + "keyFormat" : "kafka", + "valueFormat" : "avro", + "topic" : "kafka-topic", + "partitions" : 1, + "replication" : 1, + "statement" : "sql statement", + "queryOffsetSummaries" : [ ], + "sourceConstraints" : [ ], + "clusterStatistics" : [ ], + "clusterErrorStats" : [ ] + }, + "warnings" : [ ] +} ] diff --git a/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.testPrintSourceDescriptionWithHeaders.approved.tabular b/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.testPrintSourceDescriptionWithHeaders.approved.tabular new file mode 100644 index 000000000000..fa0b608ca5cd --- /dev/null +++ b/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.testPrintSourceDescriptionWithHeaders.approved.tabular @@ -0,0 +1,43 @@ + +Name : TestSource +Type : TABLE +Timestamp field : 2000-01-01 +Key format : kafka +Value format : avro +Kafka topic : kafka-topic (partitions: 1, replication: 1) +Statement : sql statement + + Field | Type +-------------------------------------------------------------------- + ROWKEY | VARCHAR(STRING) (primary key) + f_0 | BOOLEAN + f_1 | INTEGER + f_2 | BIGINT + f_3 | DOUBLE + f_4 | VARCHAR(STRING) + f_5 | ARRAY + f_6 | MAP + f_7 | STRUCT + HEAD | ARRAY> (headers) +-------------------------------------------------------------------- + +Queries that read from this TABLE +----------------------------------- +readId (ERROR) : read query + +For query topology and execution plan please run: EXPLAIN + +Queries that write from this TABLE +----------------------------------- +writeId (ERROR) : write query + +For query topology and execution plan please run: EXPLAIN + +Local runtime statistics +------------------------ +The statistics field is deprecated and will be removed in a future version of ksql. Please update your client to the latest version and use statisticsMap instead. +stats +The errorStats field is deprecated and will be removed in a future version of ksql. Please update your client to the latest version and use errorStatsMap instead. +errors + +(Statistics of the local KSQL server interaction with the Kafka topic kafka-topic) diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/util/EntityUtil.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/util/EntityUtil.java index 31a49588ab29..7fd0613835c6 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/util/EntityUtil.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/util/EntityUtil.java @@ -19,7 +19,6 @@ import io.confluent.ksql.rest.entity.FieldInfo.FieldType; import io.confluent.ksql.rest.entity.SchemaInfo; import io.confluent.ksql.schema.ksql.Column; -import io.confluent.ksql.schema.ksql.Column.Namespace; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.SqlTypeWalker; import io.confluent.ksql.schema.ksql.types.SqlArray; @@ -55,13 +54,19 @@ public static SchemaInfo schemaInfo(final SqlType type) { } private static FieldInfo toFieldInfo(final Column column) { - return new FieldInfo(column.name().text(), schemaInfo(column.type()), fieldType(column)); + return new FieldInfo( + column.name().text(), schemaInfo(column.type()), fieldType(column), column.headerKey()); } private static Optional fieldType(final Column column) { - return column.namespace() == Namespace.KEY - ? Optional.of(FieldType.KEY) - : Optional.empty(); + switch (column.namespace()) { + case KEY: + return Optional.of(FieldType.KEY); + case HEADERS: + return Optional.of(FieldType.HEADER); + default: + return Optional.empty(); + } } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/util/EntityUtilTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/util/EntityUtilTest.java index 1657f35a4f95..c89c04d2ded4 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/util/EntityUtilTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/util/EntityUtilTest.java @@ -210,6 +210,43 @@ public void shouldSupportSchemasWithKeyColumns() { assertThat(fields.get(0).getType(), equalTo(Optional.of(FieldType.KEY))); } + @Test + public void shouldSupportSchemasWithAllHeadersColumn() { + // Given: + final LogicalSchema schema = LogicalSchema.builder() + .headerColumn(ColumnName.of("field1"), Optional.empty()) + .build(); + + // When: + final List fields = EntityUtil.buildSourceSchemaEntity(schema); + + // Then: + assertThat(fields, hasSize(1)); + assertThat(fields.get(0).getName(), equalTo("field1")); + assertThat(fields.get(0).getSchema().getTypeName(), equalTo("ARRAY")); + assertThat(fields.get(0).getType(), equalTo(Optional.of(FieldType.HEADER))); + assertThat(fields.get(0).getHeaderKey(), equalTo(Optional.empty())); + } + + @Test + public void shouldSupportSchemasWithExtractedHeaderColumns() { + // Given: + final LogicalSchema schema = LogicalSchema.builder() + .headerColumn(ColumnName.of("field1"), Optional.of("abc")) + .build(); + + // When: + final List fields = EntityUtil.buildSourceSchemaEntity(schema); + + // Then: + assertThat(fields, hasSize(1)); + assertThat(fields.get(0).getName(), equalTo("field1")); + assertThat(fields.get(0).getSchema().getTypeName(), equalTo("BYTES")); + assertThat(fields.get(0).getType(), equalTo(Optional.of(FieldType.HEADER))); + assertThat(fields.get(0).getHeaderKey(), equalTo(Optional.of("abc"))); + } + + @Test public void shouldMaintainColumnOrder() { // Given: diff --git a/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/FieldInfo.java b/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/FieldInfo.java index cbfd5680d42f..a7c0a1492b2e 100644 --- a/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/FieldInfo.java +++ b/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/FieldInfo.java @@ -31,22 +31,34 @@ public class FieldInfo { public enum FieldType { SYSTEM, // To be removed in the future. 0.9 saw this value no longer used. - KEY + KEY, + HEADER } private final String name; private final SchemaInfo schema; private final Optional type; + private final Optional headerKey; + + public FieldInfo( + final String name, + final SchemaInfo schema, + final Optional type + ) { + this(name, schema, type, Optional.empty()); + } @JsonCreator public FieldInfo( @JsonProperty(value = "name", required = true) final String name, @JsonProperty(value = "schema", required = true) final SchemaInfo schema, - @JsonProperty("fieldType") final Optional type + @JsonProperty("fieldType") final Optional type, + @JsonProperty("headerKey") final Optional headerKey ) { this.name = Objects.requireNonNull(name, "name"); this.schema = Objects.requireNonNull(schema, "schema"); this.type = Objects.requireNonNull(type, "type"); + this.headerKey = Objects.requireNonNull(headerKey, "headerKey"); } public String getName() { @@ -61,17 +73,22 @@ public Optional getType() { return type; } + public Optional getHeaderKey() { + return headerKey; + } + @Override public boolean equals(final Object other) { return other instanceof FieldInfo && Objects.equals(name, ((FieldInfo) other).name) && Objects.equals(schema, ((FieldInfo) other).schema) - && Objects.equals(type, ((FieldInfo) other).type); + && Objects.equals(type, ((FieldInfo) other).type) + && Objects.equals(headerKey, ((FieldInfo) other).headerKey); } @Override public int hashCode() { - return Objects.hash(name, schema, type); + return Objects.hash(name, schema, type, headerKey); } @Override @@ -80,6 +97,7 @@ public String toString() { + "name='" + name + '\'' + ", schema=" + schema + ", type=" + type + + ", headerKey=" + headerKey + '}'; } }