Skip to content

Commit

Permalink
feature working
Browse files Browse the repository at this point in the history
  • Loading branch information
jeqo committed May 26, 2020
1 parent 434680f commit f55b82a
Show file tree
Hide file tree
Showing 12 changed files with 103 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import io.confluent.ksql.rest.entity.CommandStatusEntity;
import io.confluent.ksql.rest.entity.ConnectorDescription;
import io.confluent.ksql.rest.entity.ConnectorList;
import io.confluent.ksql.rest.entity.ConsumerGroupOffsets;
import io.confluent.ksql.rest.entity.CreateConnectorEntity;
import io.confluent.ksql.rest.entity.DropConnectorEntity;
import io.confluent.ksql.rest.entity.ErrorEntity;
Expand Down Expand Up @@ -618,6 +619,24 @@ private void printSourceDescription(final SourceDescription source) {
"Statistics of the local KSQL server interaction with the Kafka topic "
+ source.getTopic()
));
writer().println();
ConsumerGroupOffsets consumerGroupOffsets = source.getConsumerGroupOffsets();
writer().println(String.format("%-20s : %s", "Consumer Group", consumerGroupOffsets.getGroupId()));
writer().println(String.format("%-20s : %s", "Kafka topic", consumerGroupOffsets.getKafkaTopic()));
writer().println("");
final Table taskTable = new Table.Builder()
.withColumnHeaders(ImmutableList.of("Partition", "Start Offset", "End Offset", "Offset", "Lag"))
.withRows(consumerGroupOffsets.getOffsets()
.stream()
.map(offset -> ImmutableList.of(
String.valueOf(offset.getPartition()),
String.valueOf(offset.getLogStartOffset()),
String.valueOf(offset.getLogEndOffset()),
String.valueOf(offset.getConsumerOffset()),
String.valueOf(offset.getLogEndOffset() - offset.getConsumerOffset())
)))
.build();
taskTable.print(this);
}

private void printSourceDescriptionList(final SourceDescriptionList sourceDescriptionList) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
import io.confluent.ksql.rest.entity.RunningQuery;
import io.confluent.ksql.rest.entity.SchemaInfo;
import io.confluent.ksql.rest.entity.SimpleConnectorInfo;
import io.confluent.ksql.rest.entity.SourceConsumerGroupOffsets;
import io.confluent.ksql.rest.entity.ConsumerGroupOffsets;
import io.confluent.ksql.rest.entity.SourceDescription;
import io.confluent.ksql.rest.entity.SourceDescriptionEntity;
import io.confluent.ksql.rest.entity.SourceInfo;
Expand Down Expand Up @@ -142,7 +142,7 @@ public class ConsoleTest {
2,
1,
"statement",
new SourceConsumerGroupOffsets("", "kadka-topic", Collections.emptyList()));
new ConsumerGroupOffsets("", "kadka-topic", Collections.emptyList()));

@Mock
private QueryStatusCount queryStatusCount;
Expand Down Expand Up @@ -501,7 +501,7 @@ public void testPrintSourceDescription() {
1,
1,
"sql statement",
new SourceConsumerGroupOffsets("", "kadka-topic", Collections.emptyList())),
new ConsumerGroupOffsets("", "kadka-topic", Collections.emptyList())),
Collections.emptyList()
)
));
Expand Down Expand Up @@ -1150,7 +1150,7 @@ public void shouldPrintTopicDescribeExtended() {
"kadka-topic",
2, 1,
"sql statement text",
new SourceConsumerGroupOffsets("", "kadka-topic", Collections.emptyList())),
new ConsumerGroupOffsets("", "kadka-topic", Collections.emptyList())),
Collections.emptyList()
))
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;

public final class SourceDescriptionFactory {

Expand All @@ -43,6 +44,7 @@ public static SourceDescription create(
final List<RunningQuery> writeQueries,
final Optional<TopicDescription> topicDescription,
final Optional<ConsumerGroupDescription> consumerGroupDescription,
final Map<TopicPartition, ListOffsetsResultInfo> topicAndStartOffsets,
final Map<TopicPartition, ListOffsetsResultInfo> topicAndEndOffsets,
final Map<TopicPartition, OffsetAndMetadata> topicAndConsumerOffsets
) {
Expand All @@ -69,21 +71,38 @@ public static SourceDescription create(
topicDescription.map(td -> td.partitions().size()).orElse(0),
topicDescription.map(td -> td.partitions().get(0).replicas().size()).orElse(0),
dataSource.getSqlExpression(),
consumerGroupDescription.map(
cg -> new SourceConsumerGroupOffsets(cg.groupId(), dataSource.getKafkaTopicName(),
consumerOffsets(topicAndEndOffsets, topicAndConsumerOffsets)))
.orElse(new SourceConsumerGroupOffsets("", dataSource.getKafkaTopicName(), Collections
.emptyList())));
topicDescription.map(td ->
consumerGroupDescription.map(
cg -> new ConsumerGroupOffsets(
cg.groupId(),
dataSource.getKafkaTopicName(),
consumerOffsets(td, topicAndStartOffsets, topicAndEndOffsets,
topicAndConsumerOffsets)))
.orElse(
new ConsumerGroupOffsets("", "", Collections.emptyList())))
.orElse(new ConsumerGroupOffsets("", "", Collections.emptyList())));
}

private static List<SourceConsumerOffset> consumerOffsets(
private static List<ConsumerOffset> consumerOffsets(
final TopicDescription topicDescription,
final Map<TopicPartition, ListOffsetsResultInfo> topicAndStartOffsets,
final Map<TopicPartition, ListOffsetsResultInfo> topicAndEndOffsets,
final Map<TopicPartition, OffsetAndMetadata> topicAndConsumerOffsets
) {
List<SourceConsumerOffset> consumerOffsets = new ArrayList<>();
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : topicAndConsumerOffsets.entrySet()) {
consumerOffsets.add(new SourceConsumerOffset(entry.getKey().partition(),
topicAndEndOffsets.get(entry.getKey()).offset(), entry.getValue().offset()));
List<ConsumerOffset> consumerOffsets = new ArrayList<>();
for (TopicPartitionInfo topicPartitionInfo : topicDescription.partitions()) {
final TopicPartition tp = new TopicPartition(topicDescription.name(),
topicPartitionInfo.partition());
ListOffsetsResultInfo startOffsetResultInfo = topicAndStartOffsets.get(tp);
ListOffsetsResultInfo endOffsetResultInfo = topicAndEndOffsets.get(tp);
OffsetAndMetadata offsetAndMetadata = topicAndConsumerOffsets.get(tp);
consumerOffsets.add(
new ConsumerOffset(
topicPartitionInfo.partition(),
startOffsetResultInfo != null ? startOffsetResultInfo.offset() : 0,
endOffsetResultInfo != null ? endOffsetResultInfo.offset() : 0,
offsetAndMetadata != null ? offsetAndMetadata.offset() : 0
));
}
return consumerOffsets;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ public Optional<KsqlEntity> execute(
Optional.empty(),
Optional.empty(),
ImmutableMap.of(),
ImmutableMap.of(),
ImmutableMap.of()))
.collect(Collectors.toList());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ private static SourceDescriptionWithWarnings describeSource(
Optional<org.apache.kafka.clients.admin.TopicDescription> topicDescription = Optional.empty();
Optional<ConsumerGroupDescription> consumerGroupDescription = Optional.empty();
Map<TopicPartition, OffsetAndMetadata> topicAndConsumerOffsets = new LinkedHashMap<>();
Map<TopicPartition, ListOffsetsResultInfo> topicAndStartOffsets = new LinkedHashMap<>();
Map<TopicPartition, ListOffsetsResultInfo> topicAndEndOffsets = new LinkedHashMap<>();
final List<KsqlWarning> warnings = new LinkedList<>();
if (extended) {
Expand All @@ -223,11 +224,14 @@ private static SourceDescriptionWithWarnings describeSource(
serviceContext.getAdminClient().describeConsumerGroups(Collections.singletonList(consumerGroupId)).describedGroups().get(consumerGroupId).get()
);
topicAndConsumerOffsets = serviceContext.getAdminClient().listConsumerGroupOffsets(consumerGroupId).partitionsToOffsetAndMetadata().get();
Map<TopicPartition, OffsetSpec> request = new LinkedHashMap<>();
Map<TopicPartition, OffsetSpec> startRequest = new LinkedHashMap<>();
Map<TopicPartition, OffsetSpec> endRequest = new LinkedHashMap<>();
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry: topicAndConsumerOffsets.entrySet()) {
request.put(entry.getKey(), OffsetSpec.earliest());
startRequest.put(entry.getKey(), OffsetSpec.earliest());
endRequest.put(entry.getKey(), OffsetSpec.latest());
}
topicAndEndOffsets = serviceContext.getAdminClient().listOffsets(request).all().get();
topicAndStartOffsets = serviceContext.getAdminClient().listOffsets(startRequest).all().get();
topicAndEndOffsets = serviceContext.getAdminClient().listOffsets(endRequest).all().get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
Expand All @@ -245,6 +249,7 @@ private static SourceDescriptionWithWarnings describeSource(
sinkQueries,
topicDescription,
consumerGroupDescription,
topicAndStartOffsets,
topicAndEndOffsets,
topicAndConsumerOffsets
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ public void shouldReturnStatsBasedOnKafkaTopic() {
Optional.empty(),
Optional.empty(),
Collections.emptyMap(),
Collections.emptyMap(),
Collections.emptyMap());

// Then:
Expand All @@ -148,6 +149,7 @@ public void shouldReturnEmptyTimestampColumn() {
Optional.empty(),
Optional.empty(),
Collections.emptyMap(),
Collections.emptyMap(),
Collections.emptyMap());

// Then:
Expand All @@ -173,6 +175,7 @@ public void shouldReturnTimestampColumnIfPresent() {
Optional.empty(),
Optional.empty(),
Collections.emptyMap(),
Collections.emptyMap(),
Collections.emptyMap());

// Then:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ public void shouldShowStreamsExtended() {
Optional.of(topicWith1PartitionAndRfOf1),
Optional.empty(),
ImmutableMap.of(),
ImmutableMap.of(),
ImmutableMap.of()),
SourceDescriptionFactory.create(
stream2,
Expand All @@ -158,6 +159,7 @@ public void shouldShowStreamsExtended() {
Optional.of(topicWith1PartitionAndRfOf1),
Optional.empty(),
ImmutableMap.of(),
ImmutableMap.of(),
ImmutableMap.of())
));
}
Expand Down Expand Up @@ -222,6 +224,7 @@ public void shouldShowTablesExtended() {
Optional.of(client.describeTopic(table1.getKafkaTopicName())),
Optional.empty(),
ImmutableMap.of(),
ImmutableMap.of(),
ImmutableMap.of()
),
SourceDescriptionFactory.create(
Expand All @@ -232,6 +235,7 @@ public void shouldShowTablesExtended() {
Optional.of(client.describeTopic(table1.getKafkaTopicName())),
Optional.empty(),
ImmutableMap.of(),
ImmutableMap.of(),
ImmutableMap.of()
)
));
Expand Down Expand Up @@ -282,6 +286,7 @@ public void shouldShowColumnsSource() {
Optional.empty(),
Optional.empty(),
ImmutableMap.of(),
ImmutableMap.of(),
ImmutableMap.of())));
}

Expand Down Expand Up @@ -347,6 +352,7 @@ private static void assertSourceListWithWarning(
Optional.empty(),
Optional.empty(),
ImmutableMap.of(),
ImmutableMap.of(),
ImmutableMap.of()
)
)
Expand Down Expand Up @@ -436,6 +442,7 @@ public void shouldAddWarningOnClientExceptionForDescription() {
Optional.empty(),
Optional.empty(),
ImmutableMap.of(),
ImmutableMap.of(),
ImmutableMap.of()
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,13 +487,15 @@ public void shouldShowStreamsExtended() {
Optional.of(kafkaTopicClient.describeTopic("KAFKA_TOPIC_2")),
Optional.empty(),
ImmutableMap.of(),
ImmutableMap.of(),
ImmutableMap.of()),
SourceDescriptionFactory.create(
ksqlEngine.getMetaStore().getSource(SourceName.of("new_stream")),
true, Collections.emptyList(), Collections.emptyList(),
Optional.of(kafkaTopicClient.describeTopic("new_topic")),
Optional.empty(),
ImmutableMap.of(),
ImmutableMap.of(),
ImmutableMap.of()))
);
}
Expand Down Expand Up @@ -523,13 +525,15 @@ public void shouldShowTablesExtended() {
Optional.of(kafkaTopicClient.describeTopic("KAFKA_TOPIC_1")),
Optional.empty(),
ImmutableMap.of(),
ImmutableMap.of(),
ImmutableMap.of()),
SourceDescriptionFactory.create(
ksqlEngine.getMetaStore().getSource(SourceName.of("new_table")),
true, Collections.emptyList(), Collections.emptyList(),
Optional.of(kafkaTopicClient.describeTopic("new_topic")),
Optional.empty(),
ImmutableMap.of(),
ImmutableMap.of(),
ImmutableMap.of()))
);
}
Expand Down Expand Up @@ -577,6 +581,7 @@ public void shouldDescribeStatement() {
Optional.empty(),
Optional.empty(),
ImmutableMap.of(),
ImmutableMap.of(),
ImmutableMap.of()
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,16 @@
import java.util.Objects;

@JsonIgnoreProperties(ignoreUnknown = true)
public class SourceConsumerGroupOffsets {
public class ConsumerGroupOffsets {
private final String groupId;
private final String kafkaTopic;
private final List<SourceConsumerOffset> offsets;
private final List<ConsumerOffset> offsets;

@JsonCreator
public SourceConsumerGroupOffsets(
public ConsumerGroupOffsets(
@JsonProperty("groupId") String groupId,
@JsonProperty("kafkaTopic") String kafkaTopic,
@JsonProperty("offsets") List<SourceConsumerOffset> offsets
@JsonProperty("offsets") List<ConsumerOffset> offsets
) {
this.groupId = groupId;
this.kafkaTopic = kafkaTopic;
Expand All @@ -31,7 +31,7 @@ public String getKafkaTopic() {
return kafkaTopic;
}

public List<SourceConsumerOffset> getOffsets() {
public List<ConsumerOffset> getOffsets() {
return offsets;
}

Expand All @@ -43,7 +43,7 @@ public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}
SourceConsumerGroupOffsets that = (SourceConsumerGroupOffsets) o;
ConsumerGroupOffsets that = (ConsumerGroupOffsets) o;
return Objects.equals(groupId, that.groupId) &&
Objects.equals(kafkaTopic, that.kafkaTopic) &&
Objects.equals(offsets, that.offsets);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,22 @@
import java.util.Objects;

@JsonIgnoreProperties(ignoreUnknown = true)
public class SourceConsumerOffset {
public class ConsumerOffset {

private final int partition;
private final long logStartOffset;
private final long logEndOffset;
private final long consumerOffset;

@JsonCreator
public SourceConsumerOffset(
public ConsumerOffset(
@JsonProperty("partition") int partition,
@JsonProperty("logStartOffset") long logStartOffset,
@JsonProperty("logEndOffset") long logEndOffset,
@JsonProperty("consumerOffset") long consumerOffset
) {
this.partition = partition;
this.logStartOffset = logStartOffset;
this.logEndOffset = logEndOffset;
this.consumerOffset = consumerOffset;
}
Expand All @@ -31,6 +34,10 @@ public long getConsumerOffset() {
return consumerOffset;
}

public long getLogStartOffset() {
return logStartOffset;
}

public long getLogEndOffset() {
return logEndOffset;
}
Expand All @@ -43,14 +50,15 @@ public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}
SourceConsumerOffset that = (SourceConsumerOffset) o;
ConsumerOffset that = (ConsumerOffset) o;
return partition == that.partition &&
logStartOffset == that.logStartOffset &&
logEndOffset == that.logEndOffset &&
consumerOffset == that.consumerOffset;
}

@Override
public int hashCode() {
return Objects.hash(partition, logEndOffset, consumerOffset);
return Objects.hash(partition, logStartOffset, logEndOffset, consumerOffset);
}
}
Loading

0 comments on commit f55b82a

Please sign in to comment.