From d4845b4e85f813c94fd63394e373d3bac36e1a13 Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Tue, 2 Jun 2020 09:34:50 +0100 Subject: [PATCH] fix: checkstyle --- .../confluent/ksql/cli/console/Console.java | 26 ++++++----- .../services/SandboxedServiceContext.java | 1 - .../rest/entity/SourceDescriptionFactory.java | 8 ++-- .../server/execution/ListSourceExecutor.java | 45 ++++++++++++------- 4 files changed, 50 insertions(+), 30 deletions(-) diff --git a/ksqldb-cli/src/main/java/io/confluent/ksql/cli/console/Console.java b/ksqldb-cli/src/main/java/io/confluent/ksql/cli/console/Console.java index e6e53587eb77..05b5290d2ffc 100644 --- a/ksqldb-cli/src/main/java/io/confluent/ksql/cli/console/Console.java +++ b/ksqldb-cli/src/main/java/io/confluent/ksql/cli/console/Console.java @@ -50,7 +50,6 @@ import io.confluent.ksql.rest.entity.CommandStatusEntity; import io.confluent.ksql.rest.entity.ConnectorDescription; import io.confluent.ksql.rest.entity.ConnectorList; -import io.confluent.ksql.rest.entity.SourceConsumerOffsets; import io.confluent.ksql.rest.entity.CreateConnectorEntity; import io.confluent.ksql.rest.entity.DropConnectorEntity; import io.confluent.ksql.rest.entity.ErrorEntity; @@ -73,6 +72,7 @@ import io.confluent.ksql.rest.entity.QueryDescriptionEntity; import io.confluent.ksql.rest.entity.QueryDescriptionList; import io.confluent.ksql.rest.entity.RunningQuery; +import io.confluent.ksql.rest.entity.SourceConsumerOffsets; import io.confluent.ksql.rest.entity.SourceDescription; import io.confluent.ksql.rest.entity.SourceDescriptionEntity; import io.confluent.ksql.rest.entity.SourceDescriptionList; @@ -199,6 +199,7 @@ private static Handler1 tablePrinter private CliConfig config; public interface RowCaptor { + void addRow(GenericRow row); void addRows(List> fields); @@ -317,7 +318,7 @@ public List getHistory() { public void printErrorMessage(final KsqlErrorMessage errorMessage) { if (errorMessage instanceof KsqlStatementErrorMessage) { - printKsqlEntityList(((KsqlStatementErrorMessage)errorMessage).getEntities()); + printKsqlEntityList(((KsqlStatementErrorMessage) errorMessage).getEntities()); } printError(errorMessage.getMessage(), errorMessage.toString()); } @@ -438,7 +439,7 @@ private void printAsTable(final KsqlEntity entity) { "Unexpected KsqlEntity class: '%s'", entity.getClass().getCanonicalName() )); } - + handler.handle(this, entity); printWarnings(entity); @@ -487,8 +488,8 @@ private void printSchema( private void printTopicInfo(final SourceDescription source) { final String timestamp = source.getTimestamp().isEmpty() - ? "Not set - using " - : source.getTimestamp(); + ? "Not set - using " + : source.getTimestamp(); writer().println(String.format("%-20s : %s", "Timestamp field", timestamp)); writer().println(String.format("%-20s : %s", "Key format", source.getKeyFormat())); @@ -619,15 +620,19 @@ private void printSourceDescription(final SourceDescription source) { "Statistics of the local KSQL server interaction with the Kafka topic " + source.getTopic() )); - Optional consumerGroupOffsetsOptional = source.getConsumerGroupOffsets(); + final Optional consumerGroupOffsetsOptional = source + .getConsumerGroupOffsets(); if (consumerGroupOffsetsOptional.isPresent()) { writer().println(); - SourceConsumerOffsets sourceConsumerOffsets = consumerGroupOffsetsOptional.get(); - writer().println(String.format("%-20s : %s", "Consumer Group", sourceConsumerOffsets.getGroupId())); - writer().println(String.format("%-20s : %s", "Kafka topic", sourceConsumerOffsets.getKafkaTopic())); + final SourceConsumerOffsets sourceConsumerOffsets = consumerGroupOffsetsOptional.get(); + writer().println(String.format("%-20s : %s", + "Consumer Group", sourceConsumerOffsets.getGroupId())); + writer().println(String.format("%-20s : %s", + "Kafka topic", sourceConsumerOffsets.getKafkaTopic())); writer().println(""); final Table taskTable = new Table.Builder() - .withColumnHeaders(ImmutableList.of("Partition", "Start Offset", "End Offset", "Offset", "Lag")) + .withColumnHeaders( + ImmutableList.of("Partition", "Start Offset", "End Offset", "Offset", "Lag")) .withRows(sourceConsumerOffsets.getOffsets() .stream() .map(offset -> ImmutableList.of( @@ -855,6 +860,7 @@ private void printAsJson(final Object o) { } static class NoOpRowCaptor implements RowCaptor { + @Override public void addRow(final GenericRow row) { } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/services/SandboxedServiceContext.java b/ksqldb-engine/src/main/java/io/confluent/ksql/services/SandboxedServiceContext.java index 01c62b3fb491..a80fbe028184 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/services/SandboxedServiceContext.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/services/SandboxedServiceContext.java @@ -20,7 +20,6 @@ import java.util.Objects; import java.util.function.Supplier; import org.apache.kafka.clients.admin.Admin; -import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.streams.KafkaClientSupplier; /** diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/entity/SourceDescriptionFactory.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/entity/SourceDescriptionFactory.java index c0d61c791246..31b3ba06e069 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/entity/SourceDescriptionFactory.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/entity/SourceDescriptionFactory.java @@ -82,13 +82,13 @@ private static List consumerOffsets( final Map topicAndEndOffsets, final Map topicAndConsumerOffsets ) { - List sourceConsumerOffsets = new ArrayList<>(); + final List sourceConsumerOffsets = new ArrayList<>(); for (TopicPartitionInfo topicPartitionInfo : topicDescription.partitions()) { final TopicPartition tp = new TopicPartition(topicDescription.name(), topicPartitionInfo.partition()); - ListOffsetsResultInfo startOffsetResultInfo = topicAndStartOffsets.get(tp); - ListOffsetsResultInfo endOffsetResultInfo = topicAndEndOffsets.get(tp); - OffsetAndMetadata offsetAndMetadata = topicAndConsumerOffsets.get(tp); + final ListOffsetsResultInfo startOffsetResultInfo = topicAndStartOffsets.get(tp); + final ListOffsetsResultInfo endOffsetResultInfo = topicAndEndOffsets.get(tp); + final OffsetAndMetadata offsetAndMetadata = topicAndConsumerOffsets.get(tp); sourceConsumerOffsets.add( new SourceConsumerOffset( topicPartitionInfo.partition(), diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListSourceExecutor.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListSourceExecutor.java index 3ee60fe129c8..13ab3962a0a6 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListSourceExecutor.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListSourceExecutor.java @@ -206,9 +206,9 @@ private static SourceDescriptionWithWarnings describeSource( ), statementText); } - List sourceQueries = getQueries(ksqlEngine, + final List sourceQueries = getQueries(ksqlEngine, q -> q.getSourceNames().contains(dataSource.getName())); - List sinkQueries = getQueries(ksqlEngine, + final List sinkQueries = getQueries(ksqlEngine, q -> q.getSinkName().equals(dataSource.getName())); Optional topicDescription = Optional.empty(); @@ -222,31 +222,47 @@ private static SourceDescriptionWithWarnings describeSource( topicDescription = Optional.of( serviceContext.getTopicClient().describeTopic(dataSource.getKafkaTopicName()) ); - if (sourceQueries.isEmpty()){ + if (sourceQueries.isEmpty()) { consumerGroupDescription = Optional.empty(); } else { - QueryId queryId = sourceQueries.get(0).getId(); + final QueryId queryId = sourceQueries.get(0).getId(); final String persistenceQueryPrefix = - sessionProperties.getMutableScopedProperties().get(KsqlConfig.KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG).toString(); + sessionProperties.getMutableScopedProperties() + .get(KsqlConfig.KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG).toString(); final String applicationId = getQueryApplicationId( getServiceId(sessionProperties.getMutableScopedProperties()), persistenceQueryPrefix, queryId ); consumerGroupDescription = Optional.of( - serviceContext.getAdminClient().describeConsumerGroups(Collections.singletonList(applicationId)).describedGroups().get(applicationId).get() + serviceContext.getAdminClient() + .describeConsumerGroups(Collections.singletonList(applicationId)) + .describedGroups() + .get(applicationId) + .get() ); - topicAndConsumerOffsets = serviceContext.getAdminClient().listConsumerGroupOffsets(applicationId).partitionsToOffsetAndMetadata().get(); - Map startRequest = new LinkedHashMap<>(); - Map endRequest = new LinkedHashMap<>(); - for (Map.Entry entry: topicAndConsumerOffsets.entrySet()) { + topicAndConsumerOffsets = serviceContext.getAdminClient() + .listConsumerGroupOffsets(applicationId) + .partitionsToOffsetAndMetadata() + .get(); + final Map startRequest = new LinkedHashMap<>(); + final Map endRequest = new LinkedHashMap<>(); + for (Map.Entry entry: + topicAndConsumerOffsets.entrySet()) { startRequest.put(entry.getKey(), OffsetSpec.earliest()); endRequest.put(entry.getKey(), OffsetSpec.latest()); } - topicAndStartOffsets = serviceContext.getAdminClient().listOffsets(startRequest).all().get(); - topicAndEndOffsets = serviceContext.getAdminClient().listOffsets(endRequest).all().get(); + topicAndStartOffsets = serviceContext.getAdminClient() + .listOffsets(startRequest) + .all() + .get(); + topicAndEndOffsets = serviceContext.getAdminClient() + .listOffsets(endRequest) + .all() + .get(); } - } catch (final KafkaException | KafkaResponseGetFailedException | InterruptedException | ExecutionException e) { + } catch (final KafkaException | KafkaResponseGetFailedException + | InterruptedException | ExecutionException e) { warnings.add(new KsqlWarning("Error from Kafka: " + e.getMessage())); } } @@ -267,8 +283,7 @@ private static SourceDescriptionWithWarnings describeSource( ); } - private static String getServiceId( - Map mutableScopedProperties) { + private static String getServiceId(final Map mutableScopedProperties) { return ReservedInternalTopics.KSQL_INTERNAL_TOPIC_PREFIX + mutableScopedProperties.get(KsqlConfig.KSQL_SERVICE_ID_CONFIG).toString(); }