Skip to content

Commit

Permalink
feat: indicate header columns in source descriptions (#8475)
Browse files Browse the repository at this point in the history
* feat: indicate header columns in  source descriptions

* address review comments
  • Loading branch information
Zara Lim authored Dec 10, 2021
1 parent 0c7da2e commit 065de82
Show file tree
Hide file tree
Showing 9 changed files with 548 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,13 @@ private static String formatFieldType(
) {
final FieldType possibleFieldType = field.getType().orElse(null);

if (possibleFieldType == FieldType.HEADER) {
final String headerType = field.getHeaderKey()
.map(k -> "(header('" + k + "'))")
.orElse("(headers)");
return String.format("%-16s %s", field.getSchema().toTypeString(), headerType);
}

if (possibleFieldType == FieldType.KEY) {
final String wt = windowType
.map(v -> " (Window type: " + v + ")")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,17 +187,29 @@ public static Collection<OutputFormat> data() {
return ImmutableList.of(OutputFormat.JSON, OutputFormat.TABULAR);
}

private static List<FieldInfo> buildTestSchema(final Optional<String> headerKey, final SqlType... fieldTypes) {
final LogicalSchema schema = builderWithoutHeaders(fieldTypes)
.headerColumn(ColumnName.of("HEAD"), headerKey)
.build();

return EntityUtil.buildSourceSchemaEntity(schema);
}

private static List<FieldInfo> buildTestSchema(final SqlType... fieldTypes) {
final LogicalSchema schema = builderWithoutHeaders(fieldTypes).build();

return EntityUtil.buildSourceSchemaEntity(schema);
}

private static Builder builderWithoutHeaders(final SqlType... fieldTypes) {
final Builder schemaBuilder = LogicalSchema.builder()
.keyColumn(SystemColumns.ROWKEY_NAME, SqlTypes.STRING);

for (int idx = 0; idx < fieldTypes.length; idx++) {
schemaBuilder.valueColumn(ColumnName.of("f_" + idx), fieldTypes[idx]);
}

final LogicalSchema schema = schemaBuilder.build();

return EntityUtil.buildSourceSchemaEntity(schema);
return schemaBuilder;
}

@Before
Expand Down Expand Up @@ -468,6 +480,86 @@ public void testPrintSourceDescription() {
Approvals.verify(output, approvalOptions);
}

@Test
public void testPrintSourceDescriptionWithHeaders() {
// Given:
final List<FieldInfo> fields = buildTestSchema(
Optional.empty(),
SqlTypes.BOOLEAN,
SqlTypes.INTEGER,
SqlTypes.BIGINT,
SqlTypes.DOUBLE,
SqlTypes.STRING,
SqlTypes.array(SqlTypes.STRING),
SqlTypes.map(SqlTypes.STRING, SqlTypes.BIGINT),
SqlTypes.struct()
.field("a", SqlTypes.DOUBLE)
.build()
);

final List<RunningQuery> readQueries = ImmutableList.of(
new RunningQuery("read query", ImmutableSet.of("sink1"), ImmutableSet.of("sink1 topic"), new QueryId("readId"), queryStatusCount, KsqlConstants.KsqlQueryType.PERSISTENT)
);
final List<RunningQuery> writeQueries = ImmutableList.of(
new RunningQuery("write query", ImmutableSet.of("sink2"), ImmutableSet.of("sink2 topic"), new QueryId("writeId"), queryStatusCount, KsqlConstants.KsqlQueryType.PERSISTENT)
);

final KsqlEntityList entityList = new KsqlEntityList(ImmutableList.of(
new SourceDescriptionEntity(
"some sql",
buildSourceDescription(readQueries, writeQueries, fields, false),
Collections.emptyList()
)
));

// When:
console.printKsqlEntityList(entityList);

// Then:
final String output = terminal.getOutputString();
Approvals.verify(output, approvalOptions);
}

@Test
public void testPrintSourceDescriptionWithExtractedHeader() {
// Given:
final List<FieldInfo> fields = buildTestSchema(
Optional.of("abc"),
SqlTypes.BOOLEAN,
SqlTypes.INTEGER,
SqlTypes.BIGINT,
SqlTypes.DOUBLE,
SqlTypes.STRING,
SqlTypes.array(SqlTypes.STRING),
SqlTypes.map(SqlTypes.STRING, SqlTypes.BIGINT),
SqlTypes.struct()
.field("a", SqlTypes.DOUBLE)
.build()
);

final List<RunningQuery> readQueries = ImmutableList.of(
new RunningQuery("read query", ImmutableSet.of("sink1"), ImmutableSet.of("sink1 topic"), new QueryId("readId"), queryStatusCount, KsqlConstants.KsqlQueryType.PERSISTENT)
);
final List<RunningQuery> writeQueries = ImmutableList.of(
new RunningQuery("write query", ImmutableSet.of("sink2"), ImmutableSet.of("sink2 topic"), new QueryId("writeId"), queryStatusCount, KsqlConstants.KsqlQueryType.PERSISTENT)
);

final KsqlEntityList entityList = new KsqlEntityList(ImmutableList.of(
new SourceDescriptionEntity(
"some sql",
buildSourceDescription(readQueries, writeQueries, fields, false),
Collections.emptyList()
)
));

// When:
console.printKsqlEntityList(entityList);

// Then:
final String output = terminal.getOutputString();
Approvals.verify(output, approvalOptions);
}

@Test
public void testPrintSourceDescriptionWithClusterStats() {
// Given:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
[ {
"@type" : "sourceDescription",
"statementText" : "some sql",
"sourceDescription" : {
"name" : "TestSource",
"windowType" : null,
"readQueries" : [ {
"queryString" : "read query",
"sinks" : [ "sink1" ],
"sinkKafkaTopics" : [ "sink1 topic" ],
"id" : "readId",
"statusCount" : {
"RUNNING" : 1,
"ERROR" : 2
},
"queryType" : "PERSISTENT",
"state" : "ERROR"
} ],
"writeQueries" : [ {
"queryString" : "write query",
"sinks" : [ "sink2" ],
"sinkKafkaTopics" : [ "sink2 topic" ],
"id" : "writeId",
"statusCount" : {
"RUNNING" : 1,
"ERROR" : 2
},
"queryType" : "PERSISTENT",
"state" : "ERROR"
} ],
"fields" : [ {
"name" : "ROWKEY",
"schema" : {
"type" : "STRING",
"fields" : null,
"memberSchema" : null
},
"type" : "KEY"
}, {
"name" : "f_0",
"schema" : {
"type" : "BOOLEAN",
"fields" : null,
"memberSchema" : null
}
}, {
"name" : "f_1",
"schema" : {
"type" : "INTEGER",
"fields" : null,
"memberSchema" : null
}
}, {
"name" : "f_2",
"schema" : {
"type" : "BIGINT",
"fields" : null,
"memberSchema" : null
}
}, {
"name" : "f_3",
"schema" : {
"type" : "DOUBLE",
"fields" : null,
"memberSchema" : null
}
}, {
"name" : "f_4",
"schema" : {
"type" : "STRING",
"fields" : null,
"memberSchema" : null
}
}, {
"name" : "f_5",
"schema" : {
"type" : "ARRAY",
"fields" : null,
"memberSchema" : {
"type" : "STRING",
"fields" : null,
"memberSchema" : null
}
}
}, {
"name" : "f_6",
"schema" : {
"type" : "MAP",
"fields" : null,
"memberSchema" : {
"type" : "BIGINT",
"fields" : null,
"memberSchema" : null
}
}
}, {
"name" : "f_7",
"schema" : {
"type" : "STRUCT",
"fields" : [ {
"name" : "a",
"schema" : {
"type" : "DOUBLE",
"fields" : null,
"memberSchema" : null
}
} ],
"memberSchema" : null
}
}, {
"name" : "HEAD",
"schema" : {
"type" : "BYTES",
"fields" : null,
"memberSchema" : null
},
"headerKey" : "abc",
"type" : "HEADER"
} ],
"type" : "TABLE",
"timestamp" : "2000-01-01",
"statistics" : "The statistics field is deprecated and will be removed in a future version of ksql. Please update your client to the latest version and use statisticsMap instead.\nstats",
"errorStats" : "The errorStats field is deprecated and will be removed in a future version of ksql. Please update your client to the latest version and use errorStatsMap instead.\nerrors\n",
"extended" : true,
"keyFormat" : "kafka",
"valueFormat" : "avro",
"topic" : "kafka-topic",
"partitions" : 1,
"replication" : 1,
"statement" : "sql statement",
"queryOffsetSummaries" : [ ],
"sourceConstraints" : [ ],
"clusterStatistics" : [ ],
"clusterErrorStats" : [ ]
},
"warnings" : [ ]
} ]
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@

Name : TestSource
Type : TABLE
Timestamp field : 2000-01-01
Key format : kafka
Value format : avro
Kafka topic : kafka-topic (partitions: 1, replication: 1)
Statement : sql statement

Field | Type
-------------------------------------------
ROWKEY | VARCHAR(STRING) (primary key)
f_0 | BOOLEAN
f_1 | INTEGER
f_2 | BIGINT
f_3 | DOUBLE
f_4 | VARCHAR(STRING)
f_5 | ARRAY<VARCHAR(STRING)>
f_6 | MAP<STRING, BIGINT>
f_7 | STRUCT<a DOUBLE>
HEAD | BYTES (header('abc'))
-------------------------------------------

Queries that read from this TABLE
-----------------------------------
readId (ERROR) : read query

For query topology and execution plan please run: EXPLAIN <QueryId>

Queries that write from this TABLE
-----------------------------------
writeId (ERROR) : write query

For query topology and execution plan please run: EXPLAIN <QueryId>

Local runtime statistics
------------------------
The statistics field is deprecated and will be removed in a future version of ksql. Please update your client to the latest version and use statisticsMap instead.
stats
The errorStats field is deprecated and will be removed in a future version of ksql. Please update your client to the latest version and use errorStatsMap instead.
errors

(Statistics of the local KSQL server interaction with the Kafka topic kafka-topic)
Loading

0 comments on commit 065de82

Please sign in to comment.