Skip to content

Commit

Permalink
add unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jeqo committed May 26, 2020
1 parent 9c4aeb8 commit 31548ac
Show file tree
Hide file tree
Showing 6 changed files with 205 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
import io.confluent.ksql.rest.entity.CommandStatusEntity;
import io.confluent.ksql.rest.entity.ConnectorDescription;
import io.confluent.ksql.rest.entity.ConnectorList;
import io.confluent.ksql.rest.entity.ConsumerGroupOffsets;
import io.confluent.ksql.rest.entity.SourceConsumerOffsets;
import io.confluent.ksql.rest.entity.CreateConnectorEntity;
import io.confluent.ksql.rest.entity.DropConnectorEntity;
import io.confluent.ksql.rest.entity.ErrorEntity;
Expand Down Expand Up @@ -619,16 +619,16 @@ private void printSourceDescription(final SourceDescription source) {
"Statistics of the local KSQL server interaction with the Kafka topic "
+ source.getTopic()
));
writer().println();
Optional<ConsumerGroupOffsets> consumerGroupOffsetsOptional = source.getConsumerGroupOffsets();
Optional<SourceConsumerOffsets> consumerGroupOffsetsOptional = source.getConsumerGroupOffsets();
if (consumerGroupOffsetsOptional.isPresent()) {
ConsumerGroupOffsets consumerGroupOffsets = consumerGroupOffsetsOptional.get();
writer().println(String.format("%-20s : %s", "Consumer Group", consumerGroupOffsets.getGroupId()));
writer().println(String.format("%-20s : %s", "Kafka topic", consumerGroupOffsets.getKafkaTopic()));
writer().println();
SourceConsumerOffsets sourceConsumerOffsets = consumerGroupOffsetsOptional.get();
writer().println(String.format("%-20s : %s", "Consumer Group", sourceConsumerOffsets.getGroupId()));
writer().println(String.format("%-20s : %s", "Kafka topic", sourceConsumerOffsets.getKafkaTopic()));
writer().println("");
final Table taskTable = new Table.Builder()
.withColumnHeaders(ImmutableList.of("Partition", "Start Offset", "End Offset", "Offset", "Lag"))
.withRows(consumerGroupOffsets.getOffsets()
.withRows(sourceConsumerOffsets.getOffsets()
.stream()
.map(offset -> ImmutableList.of(
String.valueOf(offset.getPartition()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@
import io.confluent.ksql.rest.entity.RunningQuery;
import io.confluent.ksql.rest.entity.SchemaInfo;
import io.confluent.ksql.rest.entity.SimpleConnectorInfo;
import io.confluent.ksql.rest.entity.SourceConsumerOffset;
import io.confluent.ksql.rest.entity.SourceConsumerOffsets;
import io.confluent.ksql.rest.entity.SourceDescription;
import io.confluent.ksql.rest.entity.SourceDescriptionEntity;
import io.confluent.ksql.rest.entity.SourceInfo;
Expand Down Expand Up @@ -631,7 +633,8 @@ public void testPrintSourceDescription() {
+ " \"topic\" : \"kadka-topic\"," + NEWLINE
+ " \"partitions\" : 1," + NEWLINE
+ " \"replication\" : 1," + NEWLINE
+ " \"statement\" : \"sql statement\"" + NEWLINE
+ " \"statement\" : \"sql statement\"," + NEWLINE
+ " \"consumerGroupOffsets\" : null" + NEWLINE
+ " }," + NEWLINE
+ " \"warnings\" : [ ]" + NEWLINE
+ "} ]" + NEWLINE));
Expand Down Expand Up @@ -769,7 +772,8 @@ public void testPrintConnectorDescription() {
+ " \"topic\" : \"kadka-topic\"," + NEWLINE
+ " \"partitions\" : 2," + NEWLINE
+ " \"replication\" : 1," + NEWLINE
+ " \"statement\" : \"statement\"" + NEWLINE
+ " \"statement\" : \"statement\"," + NEWLINE
+ " \"consumerGroupOffsets\" : null" + NEWLINE
+ " } ]," + NEWLINE
+ " \"topics\" : [ \"a-jdbc-topic\" ]," + NEWLINE
+ " \"warnings\" : [ ]" + NEWLINE
Expand Down Expand Up @@ -1120,6 +1124,174 @@ public void testPrintExecuptionPlan() {
}
}

@Test
public void shouldPrintTopicDescribeExtendedWithConsumerOffsets() {
// Given:
final List<RunningQuery> readQueries = ImmutableList.of(
new RunningQuery("read query", ImmutableSet.of("sink1"), ImmutableSet.of("sink1 topic"), new QueryId("readId"), queryStatusCount, KsqlConstants.KsqlQueryType.PERSISTENT)
);
final List<RunningQuery> writeQueries = ImmutableList.of(
new RunningQuery("write query", ImmutableSet.of("sink2"), ImmutableSet.of("sink2 topic"), new QueryId("writeId"), queryStatusCount, KsqlConstants.KsqlQueryType.PERSISTENT)
);

final KsqlEntityList entityList = new KsqlEntityList(ImmutableList.of(
new SourceDescriptionEntity(
"e",
new SourceDescription(
"TestSource",
Optional.empty(),
readQueries,
writeQueries,
buildTestSchema(SqlTypes.STRING),
DataSourceType.KTABLE.getKsqlType(),
"2000-01-01",
"stats",
"errors",
true,
"kafka",
"avro",
"kadka-topic",
2, 1,
"sql statement text",
Optional.of(
new SourceConsumerOffsets(
"consumer1",
"kadka-topic",
ImmutableList.of(
new SourceConsumerOffset(0, 100, 900, 800),
new SourceConsumerOffset(1, 50, 900, 900)
))
)),
Collections.emptyList()
))
);

// When:
console.printKsqlEntityList(entityList);

// Then:
final String output = terminal.getOutputString();
if (console.getOutputFormat() == OutputFormat.JSON) {
assertThat(output, is("[ {" + NEWLINE
+ " \"@type\" : \"sourceDescription\"," + NEWLINE
+ " \"statementText\" : \"e\"," + NEWLINE
+ " \"sourceDescription\" : {" + NEWLINE
+ " \"name\" : \"TestSource\"," + NEWLINE
+ " \"windowType\" : null," + NEWLINE
+ " \"readQueries\" : [ {" + NEWLINE
+ " \"queryString\" : \"read query\"," + NEWLINE
+ " \"sinks\" : [ \"sink1\" ]," + NEWLINE
+ " \"sinkKafkaTopics\" : [ \"sink1 topic\" ]," + NEWLINE
+ " \"id\" : \"readId\"," + NEWLINE
+ " \"statusCount\" : {" + NEWLINE
+ " \"RUNNING\" : 1," + NEWLINE
+ " \"ERROR\" : 2" + NEWLINE
+ " }," + NEWLINE
+ " \"queryType\" : \"PERSISTENT\"," + NEWLINE
+ " \"state\" : \"" + AGGREGATE_STATUS +"\"" + NEWLINE
+ " } ]," + NEWLINE
+ " \"writeQueries\" : [ {" + NEWLINE
+ " \"queryString\" : \"write query\"," + NEWLINE
+ " \"sinks\" : [ \"sink2\" ]," + NEWLINE
+ " \"sinkKafkaTopics\" : [ \"sink2 topic\" ]," + NEWLINE
+ " \"id\" : \"writeId\"," + NEWLINE
+ " \"statusCount\" : {" + NEWLINE
+ " \"RUNNING\" : 1," + NEWLINE
+ " \"ERROR\" : 2" + NEWLINE
+ " }," + NEWLINE
+ " \"queryType\" : \"PERSISTENT\"," + NEWLINE
+ " \"state\" : \"" + AGGREGATE_STATUS +"\"" + NEWLINE
+ " } ]," + NEWLINE
+ " \"fields\" : [ {" + NEWLINE
+ " \"name\" : \"ROWKEY\"," + NEWLINE
+ " \"schema\" : {" + NEWLINE
+ " \"type\" : \"STRING\"," + NEWLINE
+ " \"fields\" : null," + NEWLINE
+ " \"memberSchema\" : null" + NEWLINE
+ " }," + NEWLINE
+ " \"type\" : \"KEY\"" + NEWLINE
+ " }, {" + NEWLINE
+ " \"name\" : \"f_0\"," + NEWLINE
+ " \"schema\" : {" + NEWLINE
+ " \"type\" : \"STRING\"," + NEWLINE
+ " \"fields\" : null," + NEWLINE
+ " \"memberSchema\" : null" + NEWLINE
+ " }" + NEWLINE
+ " } ]," + NEWLINE
+ " \"type\" : \"TABLE\"," + NEWLINE
+ " \"timestamp\" : \"2000-01-01\"," + NEWLINE
+ " \"statistics\" : \"stats\"," + NEWLINE
+ " \"errorStats\" : \"errors\"," + NEWLINE
+ " \"extended\" : true," + NEWLINE
+ " \"keyFormat\" : \"kafka\"," + NEWLINE
+ " \"valueFormat\" : \"avro\"," + NEWLINE
+ " \"topic\" : \"kadka-topic\"," + NEWLINE
+ " \"partitions\" : 2," + NEWLINE
+ " \"replication\" : 1," + NEWLINE
+ " \"statement\" : \"sql statement text\"," + NEWLINE
+ " \"consumerGroupOffsets\" : {" + NEWLINE
+ " \"groupId\" : \"consumer1\"," + NEWLINE
+ " \"kafkaTopic\" : \"kadka-topic\"," + NEWLINE
+ " \"offsets\" : [ {" + NEWLINE
+ " \"partition\" : 0," + NEWLINE
+ " \"logStartOffset\" : 100," + NEWLINE
+ " \"logEndOffset\" : 900," + NEWLINE
+ " \"consumerOffset\" : 800" + NEWLINE
+ " }, {" + NEWLINE
+ " \"partition\" : 1," + NEWLINE
+ " \"logStartOffset\" : 50," + NEWLINE
+ " \"logEndOffset\" : 900," + NEWLINE
+ " \"consumerOffset\" : 900" + NEWLINE
+ " } ]" + NEWLINE
+ " }" + NEWLINE
+ " }," + NEWLINE
+ " \"warnings\" : [ ]" + NEWLINE
+ "} ]" + NEWLINE));
} else {
assertThat(output, is("" + NEWLINE
+ "Name : TestSource" + NEWLINE
+ "Type : TABLE" + NEWLINE
+ "Timestamp field : 2000-01-01" + NEWLINE
+ "Key format : kafka" + NEWLINE
+ "Value format : avro" + NEWLINE
+ "Kafka topic : kadka-topic (partitions: 2, replication: 1)" + NEWLINE
+ "Statement : sql statement text" + NEWLINE
+ "" + NEWLINE
+ " Field | Type " + NEWLINE
+ "-----------------------------------------" + NEWLINE
+ " ROWKEY | VARCHAR(STRING) (primary key) " + NEWLINE
+ " f_0 | VARCHAR(STRING) " + NEWLINE
+ "-----------------------------------------" + NEWLINE
+ "" + NEWLINE
+ "Queries that read from this TABLE" + NEWLINE
+ "-----------------------------------" + NEWLINE
+ "readId (" + AGGREGATE_STATUS +") : read query" + NEWLINE
+ "\n"
+ "For query topology and execution plan please run: EXPLAIN <QueryId>" + NEWLINE
+ "" + NEWLINE
+ "Queries that write from this TABLE" + NEWLINE
+ "-----------------------------------" + NEWLINE
+ "writeId (" + AGGREGATE_STATUS + ") : write query" + NEWLINE
+ "\n"
+ "For query topology and execution plan please run: EXPLAIN <QueryId>" + NEWLINE
+ "" + NEWLINE
+ "Local runtime statistics" + NEWLINE
+ "------------------------" + NEWLINE
+ "stats" + NEWLINE
+ "errors" + NEWLINE
+ "(Statistics of the local KSQL server interaction with the Kafka topic kadka-topic)" + NEWLINE
+ NEWLINE
+ "Consumer Group : consumer1" + NEWLINE
+ "Kafka topic : kadka-topic" + NEWLINE
+ NEWLINE
+ " Partition | Start Offset | End Offset | Offset | Lag " + NEWLINE
+ "------------------------------------------------------" + NEWLINE
+ " 0 | 100 | 900 | 800 | 100 " + NEWLINE
+ " 1 | 50 | 900 | 900 | 0 " + NEWLINE
+ "------------------------------------------------------" + NEWLINE));
}
}

@Test
public void shouldPrintTopicDescribeExtended() {
// Given:
Expand Down Expand Up @@ -1216,7 +1388,8 @@ public void shouldPrintTopicDescribeExtended() {
+ " \"topic\" : \"kadka-topic\"," + NEWLINE
+ " \"partitions\" : 2," + NEWLINE
+ " \"replication\" : 1," + NEWLINE
+ " \"statement\" : \"sql statement text\"" + NEWLINE
+ " \"statement\" : \"sql statement text\"," + NEWLINE
+ " \"consumerGroupOffsets\" : null" + NEWLINE
+ " }," + NEWLINE
+ " \"warnings\" : [ ]" + NEWLINE
+ "} ]" + NEWLINE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,32 +71,32 @@ public static SourceDescription create(
topicDescription.map(td -> td.partitions().get(0).replicas().size()).orElse(0),
dataSource.getSqlExpression(),
topicDescription.flatMap(td -> consumerGroupDescription.map(cg ->
new ConsumerGroupOffsets(cg.groupId(), td.name(),
new SourceConsumerOffsets(cg.groupId(), td.name(),
consumerOffsets(td, topicAndStartOffsets, topicAndEndOffsets,
topicAndConsumerOffsets)))));
}

private static List<ConsumerOffset> consumerOffsets(
private static List<SourceConsumerOffset> consumerOffsets(
final TopicDescription topicDescription,
final Map<TopicPartition, ListOffsetsResultInfo> topicAndStartOffsets,
final Map<TopicPartition, ListOffsetsResultInfo> topicAndEndOffsets,
final Map<TopicPartition, OffsetAndMetadata> topicAndConsumerOffsets
) {
List<ConsumerOffset> consumerOffsets = new ArrayList<>();
List<SourceConsumerOffset> sourceConsumerOffsets = new ArrayList<>();
for (TopicPartitionInfo topicPartitionInfo : topicDescription.partitions()) {
final TopicPartition tp = new TopicPartition(topicDescription.name(),
topicPartitionInfo.partition());
ListOffsetsResultInfo startOffsetResultInfo = topicAndStartOffsets.get(tp);
ListOffsetsResultInfo endOffsetResultInfo = topicAndEndOffsets.get(tp);
OffsetAndMetadata offsetAndMetadata = topicAndConsumerOffsets.get(tp);
consumerOffsets.add(
new ConsumerOffset(
sourceConsumerOffsets.add(
new SourceConsumerOffset(
topicPartitionInfo.partition(),
startOffsetResultInfo != null ? startOffsetResultInfo.offset() : 0,
endOffsetResultInfo != null ? endOffsetResultInfo.offset() : 0,
offsetAndMetadata != null ? offsetAndMetadata.offset() : 0
));
}
return consumerOffsets;
return sourceConsumerOffsets;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@
import java.util.Objects;

@JsonIgnoreProperties(ignoreUnknown = true)
public class ConsumerOffset {
public class SourceConsumerOffset {

private final int partition;
private final long logStartOffset;
private final long logEndOffset;
private final long consumerOffset;

@JsonCreator
public ConsumerOffset(
public SourceConsumerOffset(
@JsonProperty("partition") int partition,
@JsonProperty("logStartOffset") long logStartOffset,
@JsonProperty("logEndOffset") long logEndOffset,
Expand Down Expand Up @@ -50,7 +50,7 @@ public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}
ConsumerOffset that = (ConsumerOffset) o;
SourceConsumerOffset that = (SourceConsumerOffset) o;
return partition == that.partition &&
logStartOffset == that.logStartOffset &&
logEndOffset == that.logEndOffset &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,16 @@
import java.util.Objects;

@JsonIgnoreProperties(ignoreUnknown = true)
public class ConsumerGroupOffsets {
public class SourceConsumerOffsets {
private final String groupId;
private final String kafkaTopic;
private final List<ConsumerOffset> offsets;
private final List<SourceConsumerOffset> offsets;

@JsonCreator
public ConsumerGroupOffsets(
public SourceConsumerOffsets(
@JsonProperty("groupId") String groupId,
@JsonProperty("kafkaTopic") String kafkaTopic,
@JsonProperty("offsets") List<ConsumerOffset> offsets
@JsonProperty("offsets") List<SourceConsumerOffset> offsets
) {
this.groupId = groupId;
this.kafkaTopic = kafkaTopic;
Expand All @@ -31,7 +31,7 @@ public String getKafkaTopic() {
return kafkaTopic;
}

public List<ConsumerOffset> getOffsets() {
public List<SourceConsumerOffset> getOffsets() {
return offsets;
}

Expand All @@ -43,7 +43,7 @@ public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}
ConsumerGroupOffsets that = (ConsumerGroupOffsets) o;
SourceConsumerOffsets that = (SourceConsumerOffsets) o;
return Objects.equals(groupId, that.groupId) &&
Objects.equals(kafkaTopic, that.kafkaTopic) &&
Objects.equals(offsets, that.offsets);
Expand Down
Loading

0 comments on commit 31548ac

Please sign in to comment.