Skip to content

Commit

Permalink
fix: checkstyle
Browse files Browse the repository at this point in the history
  • Loading branch information
jeqo committed Jun 3, 2020
1 parent 1ef5cb3 commit 14eb7a7
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 30 deletions.
26 changes: 16 additions & 10 deletions ksqldb-cli/src/main/java/io/confluent/ksql/cli/console/Console.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import io.confluent.ksql.rest.entity.CommandStatusEntity;
import io.confluent.ksql.rest.entity.ConnectorDescription;
import io.confluent.ksql.rest.entity.ConnectorList;
import io.confluent.ksql.rest.entity.SourceConsumerOffsets;
import io.confluent.ksql.rest.entity.CreateConnectorEntity;
import io.confluent.ksql.rest.entity.DropConnectorEntity;
import io.confluent.ksql.rest.entity.ErrorEntity;
Expand All @@ -73,6 +72,7 @@
import io.confluent.ksql.rest.entity.QueryDescriptionEntity;
import io.confluent.ksql.rest.entity.QueryDescriptionList;
import io.confluent.ksql.rest.entity.RunningQuery;
import io.confluent.ksql.rest.entity.SourceConsumerOffsets;
import io.confluent.ksql.rest.entity.SourceDescription;
import io.confluent.ksql.rest.entity.SourceDescriptionEntity;
import io.confluent.ksql.rest.entity.SourceDescriptionList;
Expand Down Expand Up @@ -199,6 +199,7 @@ private static <T extends KsqlEntity> Handler1<KsqlEntity, Console> tablePrinter
private CliConfig config;

public interface RowCaptor {

void addRow(GenericRow row);

void addRows(List<List<String>> fields);
Expand Down Expand Up @@ -317,7 +318,7 @@ public List<HistoryEntry> getHistory() {

public void printErrorMessage(final KsqlErrorMessage errorMessage) {
if (errorMessage instanceof KsqlStatementErrorMessage) {
printKsqlEntityList(((KsqlStatementErrorMessage)errorMessage).getEntities());
printKsqlEntityList(((KsqlStatementErrorMessage) errorMessage).getEntities());
}
printError(errorMessage.getMessage(), errorMessage.toString());
}
Expand Down Expand Up @@ -438,7 +439,7 @@ private void printAsTable(final KsqlEntity entity) {
"Unexpected KsqlEntity class: '%s'", entity.getClass().getCanonicalName()
));
}

handler.handle(this, entity);

printWarnings(entity);
Expand Down Expand Up @@ -487,8 +488,8 @@ private void printSchema(

private void printTopicInfo(final SourceDescription source) {
final String timestamp = source.getTimestamp().isEmpty()
? "Not set - using <ROWTIME>"
: source.getTimestamp();
? "Not set - using <ROWTIME>"
: source.getTimestamp();

writer().println(String.format("%-20s : %s", "Timestamp field", timestamp));
writer().println(String.format("%-20s : %s", "Key format", source.getKeyFormat()));
Expand Down Expand Up @@ -619,15 +620,19 @@ private void printSourceDescription(final SourceDescription source) {
"Statistics of the local KSQL server interaction with the Kafka topic "
+ source.getTopic()
));
Optional<SourceConsumerOffsets> consumerGroupOffsetsOptional = source.getConsumerGroupOffsets();
final Optional<SourceConsumerOffsets> consumerGroupOffsetsOptional = source
.getConsumerGroupOffsets();
if (consumerGroupOffsetsOptional.isPresent()) {
writer().println();
SourceConsumerOffsets sourceConsumerOffsets = consumerGroupOffsetsOptional.get();
writer().println(String.format("%-20s : %s", "Consumer Group", sourceConsumerOffsets.getGroupId()));
writer().println(String.format("%-20s : %s", "Kafka topic", sourceConsumerOffsets.getKafkaTopic()));
final SourceConsumerOffsets sourceConsumerOffsets = consumerGroupOffsetsOptional.get();
writer().println(String.format("%-20s : %s",
"Consumer Group", sourceConsumerOffsets.getGroupId()));
writer().println(String.format("%-20s : %s",
"Kafka topic", sourceConsumerOffsets.getKafkaTopic()));
writer().println("");
final Table taskTable = new Table.Builder()
.withColumnHeaders(ImmutableList.of("Partition", "Start Offset", "End Offset", "Offset", "Lag"))
.withColumnHeaders(
ImmutableList.of("Partition", "Start Offset", "End Offset", "Offset", "Lag"))
.withRows(sourceConsumerOffsets.getOffsets()
.stream()
.map(offset -> ImmutableList.of(
Expand Down Expand Up @@ -855,6 +860,7 @@ private void printAsJson(final Object o) {
}

static class NoOpRowCaptor implements RowCaptor {

@Override
public void addRow(final GenericRow row) {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.util.Objects;
import java.util.function.Supplier;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.streams.KafkaClientSupplier;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,13 @@ private static List<SourceConsumerOffset> consumerOffsets(
final Map<TopicPartition, ListOffsetsResultInfo> topicAndEndOffsets,
final Map<TopicPartition, OffsetAndMetadata> topicAndConsumerOffsets
) {
List<SourceConsumerOffset> sourceConsumerOffsets = new ArrayList<>();
final List<SourceConsumerOffset> sourceConsumerOffsets = new ArrayList<>();
for (TopicPartitionInfo topicPartitionInfo : topicDescription.partitions()) {
final TopicPartition tp = new TopicPartition(topicDescription.name(),
topicPartitionInfo.partition());
ListOffsetsResultInfo startOffsetResultInfo = topicAndStartOffsets.get(tp);
ListOffsetsResultInfo endOffsetResultInfo = topicAndEndOffsets.get(tp);
OffsetAndMetadata offsetAndMetadata = topicAndConsumerOffsets.get(tp);
final ListOffsetsResultInfo startOffsetResultInfo = topicAndStartOffsets.get(tp);
final ListOffsetsResultInfo endOffsetResultInfo = topicAndEndOffsets.get(tp);
final OffsetAndMetadata offsetAndMetadata = topicAndConsumerOffsets.get(tp);
sourceConsumerOffsets.add(
new SourceConsumerOffset(
topicPartitionInfo.partition(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,9 +206,9 @@ private static SourceDescriptionWithWarnings describeSource(
), statementText);
}

List<RunningQuery> sourceQueries = getQueries(ksqlEngine,
final List<RunningQuery> sourceQueries = getQueries(ksqlEngine,
q -> q.getSourceNames().contains(dataSource.getName()));
List<RunningQuery> sinkQueries = getQueries(ksqlEngine,
final List<RunningQuery> sinkQueries = getQueries(ksqlEngine,
q -> q.getSinkName().equals(dataSource.getName()));

Optional<org.apache.kafka.clients.admin.TopicDescription> topicDescription = Optional.empty();
Expand All @@ -222,31 +222,47 @@ private static SourceDescriptionWithWarnings describeSource(
topicDescription = Optional.of(
serviceContext.getTopicClient().describeTopic(dataSource.getKafkaTopicName())
);
if (sourceQueries.isEmpty()){
if (sourceQueries.isEmpty()) {
consumerGroupDescription = Optional.empty();
} else {
QueryId queryId = sourceQueries.get(0).getId();
final QueryId queryId = sourceQueries.get(0).getId();
final String persistenceQueryPrefix =
sessionProperties.getMutableScopedProperties().get(KsqlConfig.KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG).toString();
sessionProperties.getMutableScopedProperties()
.get(KsqlConfig.KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG).toString();
final String applicationId = getQueryApplicationId(
getServiceId(sessionProperties.getMutableScopedProperties()),
persistenceQueryPrefix,
queryId
);
consumerGroupDescription = Optional.of(
serviceContext.getAdminClient().describeConsumerGroups(Collections.singletonList(applicationId)).describedGroups().get(applicationId).get()
serviceContext.getAdminClient()
.describeConsumerGroups(Collections.singletonList(applicationId))
.describedGroups()
.get(applicationId)
.get()
);
topicAndConsumerOffsets = serviceContext.getAdminClient().listConsumerGroupOffsets(applicationId).partitionsToOffsetAndMetadata().get();
Map<TopicPartition, OffsetSpec> startRequest = new LinkedHashMap<>();
Map<TopicPartition, OffsetSpec> endRequest = new LinkedHashMap<>();
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry: topicAndConsumerOffsets.entrySet()) {
topicAndConsumerOffsets = serviceContext.getAdminClient()
.listConsumerGroupOffsets(applicationId)
.partitionsToOffsetAndMetadata()
.get();
final Map<TopicPartition, OffsetSpec> startRequest = new LinkedHashMap<>();
final Map<TopicPartition, OffsetSpec> endRequest = new LinkedHashMap<>();
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry:
topicAndConsumerOffsets.entrySet()) {
startRequest.put(entry.getKey(), OffsetSpec.earliest());
endRequest.put(entry.getKey(), OffsetSpec.latest());
}
topicAndStartOffsets = serviceContext.getAdminClient().listOffsets(startRequest).all().get();
topicAndEndOffsets = serviceContext.getAdminClient().listOffsets(endRequest).all().get();
topicAndStartOffsets = serviceContext.getAdminClient()
.listOffsets(startRequest)
.all()
.get();
topicAndEndOffsets = serviceContext.getAdminClient()
.listOffsets(endRequest)
.all()
.get();
}
} catch (final KafkaException | KafkaResponseGetFailedException | InterruptedException | ExecutionException e) {
} catch (final KafkaException | KafkaResponseGetFailedException
| InterruptedException | ExecutionException e) {
warnings.add(new KsqlWarning("Error from Kafka: " + e.getMessage()));
}
}
Expand All @@ -267,8 +283,7 @@ private static SourceDescriptionWithWarnings describeSource(
);
}

private static String getServiceId(
Map<String, Object> mutableScopedProperties) {
private static String getServiceId(final Map<String, Object> mutableScopedProperties) {
return ReservedInternalTopics.KSQL_INTERNAL_TOPIC_PREFIX
+ mutableScopedProperties.get(KsqlConfig.KSQL_SERVICE_ID_CONFIG).toString();
}
Expand Down

0 comments on commit 14eb7a7

Please sign in to comment.