From 809daab99332354cc079e914c9a51c1b68b4c95e Mon Sep 17 00:00:00 2001 From: iliax Date: Fri, 17 Mar 2023 13:32:05 +0400 Subject: [PATCH 1/4] 1. Polling timeouts made configurable 2. polling-related classes moved to emitter package --- .../kafka/ui/config/ClustersProperties.java | 10 +++ .../kafka/ui/emitter/AbstractEmitter.java | 17 ++-- .../ui/emitter/BackwardRecordEmitter.java | 24 +++--- .../kafka/ui/emitter/EmptyPollsCounter.java | 28 +++++++ .../ui/emitter/ForwardRecordEmitter.java | 13 ++- .../kafka/ui/emitter/PollingSettings.java | 79 +++++++++++++++++++ .../{util => emitter}/PollingThrottler.java | 3 +- .../{util => emitter}/ResultSizeLimiter.java | 2 +- .../kafka/ui/emitter/TailingEmitter.java | 5 +- .../kafka/ui/model/KafkaCluster.java | 5 +- .../kafka/ui/service/ClustersStorage.java | 2 +- .../kafka/ui/service/KafkaClusterFactory.java | 7 +- .../kafka/ui/service/MessagesService.java | 8 +- .../service/analyze/TopicAnalysisService.java | 19 ++--- .../kafka/ui/service/RecordEmitterTest.java | 22 +++--- .../kafka/ui/util/PollingThrottlerTest.java | 1 + 16 files changed, 176 insertions(+), 69 deletions(-) create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/EmptyPollsCounter.java create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/PollingSettings.java rename kafka-ui-api/src/main/java/com/provectus/kafka/ui/{util => emitter}/PollingThrottler.java (94%) rename kafka-ui-api/src/main/java/com/provectus/kafka/ui/{util => emitter}/ResultSizeLimiter.java (93%) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java index 2cd5e0e69cd..e6dc84ad89b 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java @@ -25,6 +25,8 @@ public class ClustersProperties { List clusters = new ArrayList<>(); + PollingProperties polling = new PollingProperties(); + @Data public static class Cluster { String name; @@ -47,6 +49,14 @@ public static class Cluster { TruststoreConfig ssl; } + @Data + public static class PollingProperties { + Integer pollTimeoutMs; + Integer topicPollTimeoutMs; + Integer partitionPollTimeout; + Integer noDataEmptyPolls; + } + @Data @ToString(exclude = "password") public static class MetricsConfigData { diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractEmitter.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractEmitter.java index 7cd01061d04..646cf81ca67 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractEmitter.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractEmitter.java @@ -4,7 +4,6 @@ import com.provectus.kafka.ui.model.TopicMessageEventDTO; import com.provectus.kafka.ui.model.TopicMessagePhaseDTO; import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer; -import com.provectus.kafka.ui.util.PollingThrottler; import java.time.Duration; import java.time.Instant; import org.apache.kafka.clients.consumer.Consumer; @@ -14,27 +13,21 @@ import reactor.core.publisher.FluxSink; public abstract class AbstractEmitter { - private static final Duration DEFAULT_POLL_TIMEOUT_MS = Duration.ofMillis(1000L); - - // In some situations it is hard to say whether records range (between two offsets) was fully polled. - // This happens when we have holes in records sequences that is usual case for compact topics or - // topics with transactional writes. In such cases if you want to poll all records between offsets X and Y - // there is no guarantee that you will ever see record with offset Y. - // To workaround this we can assume that after N consecutive empty polls all target messages were read. - public static final int NO_MORE_DATA_EMPTY_POLLS_COUNT = 3; private final ConsumerRecordDeserializer recordDeserializer; private final ConsumingStats consumingStats = new ConsumingStats(); private final PollingThrottler throttler; + protected final PollingSettings pollingSettings; - protected AbstractEmitter(ConsumerRecordDeserializer recordDeserializer, PollingThrottler throttler) { + protected AbstractEmitter(ConsumerRecordDeserializer recordDeserializer, PollingSettings pollingSettings) { this.recordDeserializer = recordDeserializer; - this.throttler = throttler; + this.pollingSettings = pollingSettings; + this.throttler = pollingSettings.getPollingThrottler(); } protected ConsumerRecords poll( FluxSink sink, Consumer consumer) { - return poll(sink, consumer, DEFAULT_POLL_TIMEOUT_MS); + return poll(sink, consumer, pollingSettings.getPollTimeout()); } protected ConsumerRecords poll( diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java index 996f8b9f70c..c4f6bac24b9 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java @@ -3,15 +3,12 @@ import com.provectus.kafka.ui.model.ConsumerPosition; import com.provectus.kafka.ui.model.TopicMessageEventDTO; import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer; -import com.provectus.kafka.ui.util.PollingThrottler; -import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.TreeMap; import java.util.function.Supplier; -import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -26,8 +23,6 @@ public class BackwardRecordEmitter extends AbstractEmitter implements java.util.function.Consumer> { - private static final Duration POLL_TIMEOUT = Duration.ofMillis(200); - private final Supplier> consumerSupplier; private final ConsumerPosition consumerPosition; private final int messagesPerPage; @@ -37,8 +32,8 @@ public BackwardRecordEmitter( ConsumerPosition consumerPosition, int messagesPerPage, ConsumerRecordDeserializer recordDeserializer, - PollingThrottler throttler) { - super(recordDeserializer, throttler); + PollingSettings pollingSettings) { + super(recordDeserializer, pollingSettings); this.consumerPosition = consumerPosition; this.messagesPerPage = messagesPerPage; this.consumerSupplier = consumerSupplier; @@ -109,17 +104,18 @@ private List> partitionPollIteration( var recordsToSend = new ArrayList>(); - // we use empty polls counting to verify that partition was fully read - for (int emptyPolls = 0; recordsToSend.size() < desiredMsgsToPoll && emptyPolls < NO_MORE_DATA_EMPTY_POLLS_COUNT;) { - var polledRecords = poll(sink, consumer, POLL_TIMEOUT); - log.debug("{} records polled from {}", polledRecords.count(), tp); + EmptyPollsCounter emptyPolls = pollingSettings.createEmptyPollsCounter(); + while (!sink.isCancelled() + && recordsToSend.size() < desiredMsgsToPoll + && !emptyPolls.noDataEmptyCountsReached()) { + var polledRecords = poll(sink, consumer, pollingSettings.getPartitionPollTimeout()); + emptyPolls.count(polledRecords); - // counting sequential empty polls - emptyPolls = polledRecords.isEmpty() ? emptyPolls + 1 : 0; + log.debug("{} records polled from {}", polledRecords.count(), tp); var filteredRecords = polledRecords.records(tp).stream() .filter(r -> r.offset() < toOffset) - .collect(Collectors.toList()); + .toList(); if (!polledRecords.isEmpty() && filteredRecords.isEmpty()) { // we already read all messages in target offsets interval diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/EmptyPollsCounter.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/EmptyPollsCounter.java new file mode 100644 index 00000000000..13ab71e5944 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/EmptyPollsCounter.java @@ -0,0 +1,28 @@ +package com.provectus.kafka.ui.emitter; + +import org.apache.kafka.clients.consumer.ConsumerRecords; + +// In some situations it is hard to say whether records range (between two offsets) was fully polled. +// This happens when we have holes in records sequences that is usual case for compact topics or +// topics with transactional writes. In such cases if you want to poll all records between offsets X and Y +// there is no guarantee that you will ever see record with offset Y. +// To workaround this we can assume that after N consecutive empty polls all target messages were read. +public class EmptyPollsCounter { + + private final int maxEmptyPolls; + + private int emptyPolls = 0; + + EmptyPollsCounter(int maxEmptyPolls) { + this.maxEmptyPolls = maxEmptyPolls; + } + + public void count(ConsumerRecords polled) { + emptyPolls = polled.isEmpty() ? emptyPolls + 1 : 0; + } + + public boolean noDataEmptyCountsReached() { + return emptyPolls >= maxEmptyPolls; + } + +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardRecordEmitter.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardRecordEmitter.java index 9fadb149d48..26b297150c7 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardRecordEmitter.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardRecordEmitter.java @@ -3,7 +3,6 @@ import com.provectus.kafka.ui.model.ConsumerPosition; import com.provectus.kafka.ui.model.TopicMessageEventDTO; import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer; -import com.provectus.kafka.ui.util.PollingThrottler; import java.util.function.Supplier; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -25,8 +24,8 @@ public ForwardRecordEmitter( Supplier> consumerSupplier, ConsumerPosition position, ConsumerRecordDeserializer recordDeserializer, - PollingThrottler throttler) { - super(recordDeserializer, throttler); + PollingSettings pollingSettings) { + super(recordDeserializer, pollingSettings); this.position = position; this.consumerSupplier = consumerSupplier; } @@ -39,16 +38,16 @@ public void accept(FluxSink sink) { var seekOperations = SeekOperations.create(consumer, position); seekOperations.assignAndSeekNonEmptyPartitions(); - // we use empty polls counting to verify that topic was fully read - int emptyPolls = 0; + EmptyPollsCounter emptyPolls = pollingSettings.createEmptyPollsCounter(); while (!sink.isCancelled() && !seekOperations.assignedPartitionsFullyPolled() - && emptyPolls < NO_MORE_DATA_EMPTY_POLLS_COUNT) { + && !emptyPolls.noDataEmptyCountsReached()) { sendPhase(sink, "Polling"); ConsumerRecords records = poll(sink, consumer); + emptyPolls.count(records); + log.debug("{} records polled", records.count()); - emptyPolls = records.isEmpty() ? emptyPolls + 1 : 0; for (ConsumerRecord msg : records) { if (!sink.isCancelled()) { diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/PollingSettings.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/PollingSettings.java new file mode 100644 index 00000000000..10fdb6e5b21 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/PollingSettings.java @@ -0,0 +1,79 @@ +package com.provectus.kafka.ui.emitter; + +import com.provectus.kafka.ui.config.ClustersProperties; +import java.time.Duration; +import java.util.Optional; +import java.util.function.Supplier; + +public class PollingSettings { + + private static final Duration DEFAULT_POLL_TIMEOUT = Duration.ofMillis(1_000); + private static final Duration DEFAULT_PARTITION_POLL_TIMEOUT = Duration.ofMillis(200); + private static final int DEFAULT_NO_DATA_EMPTY_POLLS = 3; + + private final Duration pollTimeout; + private final Duration partitionPollTimeout; + private final int notDataEmptyPolls; //see EmptyPollsCounter docs + + private final Supplier throttlerSupplier; + + public static PollingSettings create(ClustersProperties.Cluster cluster, + ClustersProperties clustersProperties) { + var pollingProps = Optional.ofNullable(clustersProperties.getPolling()) + .orElseGet(ClustersProperties.PollingProperties::new); + + var pollTimeout = pollingProps.getPollTimeoutMs() != null + ? Duration.ofMillis(pollingProps.getPollTimeoutMs()) + : DEFAULT_POLL_TIMEOUT; + + var partitionPollTimeout = pollingProps.getPartitionPollTimeout() != null + ? Duration.ofMillis(pollingProps.getPartitionPollTimeout()) + : Duration.ofMillis(pollTimeout.toMillis() / 5); + + int notDataEmptyPolls = pollingProps.getNoDataEmptyPolls() != null + ? pollingProps.getNoDataEmptyPolls() + : DEFAULT_NO_DATA_EMPTY_POLLS; + + return new PollingSettings( + pollTimeout, + partitionPollTimeout, + notDataEmptyPolls, + PollingThrottler.throttlerSupplier(cluster) + ); + } + + public static PollingSettings createDefault() { + return new PollingSettings( + DEFAULT_POLL_TIMEOUT, + DEFAULT_PARTITION_POLL_TIMEOUT, + DEFAULT_NO_DATA_EMPTY_POLLS, + PollingThrottler::noop + ); + } + + private PollingSettings(Duration pollTimeout, + Duration partitionPollTimeout, + int notDataEmptyPolls, + Supplier throttlerSupplier) { + this.pollTimeout = pollTimeout; + this.partitionPollTimeout = partitionPollTimeout; + this.notDataEmptyPolls = notDataEmptyPolls; + this.throttlerSupplier = throttlerSupplier; + } + + public EmptyPollsCounter createEmptyPollsCounter() { + return new EmptyPollsCounter(notDataEmptyPolls); + } + + public Duration getPollTimeout() { + return pollTimeout; + } + + public Duration getPartitionPollTimeout() { + return partitionPollTimeout; + } + + public PollingThrottler getPollingThrottler() { + return throttlerSupplier.get(); + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/PollingThrottler.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/PollingThrottler.java similarity index 94% rename from kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/PollingThrottler.java rename to kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/PollingThrottler.java index bd2e97da97a..15dfcd91c94 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/PollingThrottler.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/PollingThrottler.java @@ -1,8 +1,9 @@ -package com.provectus.kafka.ui.util; +package com.provectus.kafka.ui.emitter; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.RateLimiter; import com.provectus.kafka.ui.config.ClustersProperties; +import com.provectus.kafka.ui.util.ConsumerRecordsUtil; import java.util.function.Supplier; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecords; diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ResultSizeLimiter.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ResultSizeLimiter.java similarity index 93% rename from kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ResultSizeLimiter.java rename to kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ResultSizeLimiter.java index 64fcb215090..a0fa5bcb938 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ResultSizeLimiter.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ResultSizeLimiter.java @@ -1,4 +1,4 @@ -package com.provectus.kafka.ui.util; +package com.provectus.kafka.ui.emitter; import com.provectus.kafka.ui.model.TopicMessageEventDTO; import java.util.concurrent.atomic.AtomicInteger; diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/TailingEmitter.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/TailingEmitter.java index 06cd8dad998..4554069c1c9 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/TailingEmitter.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/TailingEmitter.java @@ -3,7 +3,6 @@ import com.provectus.kafka.ui.model.ConsumerPosition; import com.provectus.kafka.ui.model.TopicMessageEventDTO; import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer; -import com.provectus.kafka.ui.util.PollingThrottler; import java.util.HashMap; import java.util.function.Supplier; import lombok.extern.slf4j.Slf4j; @@ -22,8 +21,8 @@ public class TailingEmitter extends AbstractEmitter public TailingEmitter(Supplier> consumerSupplier, ConsumerPosition consumerPosition, ConsumerRecordDeserializer recordDeserializer, - PollingThrottler throttler) { - super(recordDeserializer, throttler); + PollingSettings pollingSettings) { + super(recordDeserializer, pollingSettings); this.consumerSupplier = consumerSupplier; this.consumerPosition = consumerPosition; } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java index 9933d7e4675..1e2903dbcc9 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java @@ -2,14 +2,13 @@ import com.provectus.kafka.ui.config.ClustersProperties; import com.provectus.kafka.ui.connect.api.KafkaConnectClientApi; +import com.provectus.kafka.ui.emitter.PollingSettings; import com.provectus.kafka.ui.service.ksql.KsqlApiClient; import com.provectus.kafka.ui.service.masking.DataMasking; import com.provectus.kafka.ui.sr.api.KafkaSrClientApi; -import com.provectus.kafka.ui.util.PollingThrottler; import com.provectus.kafka.ui.util.ReactiveFailover; import java.util.Map; import java.util.Properties; -import java.util.function.Supplier; import lombok.AccessLevel; import lombok.AllArgsConstructor; import lombok.Builder; @@ -28,7 +27,7 @@ public class KafkaCluster { private final boolean readOnly; private final MetricsConfig metricsConfig; private final DataMasking masking; - private final Supplier throttler; + private final PollingSettings pollingSettings; private final ReactiveFailover schemaRegistryClient; private final Map> connectsClients; private final ReactiveFailover ksqlClient; diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClustersStorage.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClustersStorage.java index c0143ad8c59..ee08d6392d8 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClustersStorage.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClustersStorage.java @@ -14,7 +14,7 @@ public class ClustersStorage { public ClustersStorage(ClustersProperties properties, KafkaClusterFactory factory) { var builder = ImmutableMap.builder(); - properties.getClusters().forEach(c -> builder.put(c.getName(), factory.create(c))); + properties.getClusters().forEach(c -> builder.put(c.getName(), factory.create(properties, c))); this.kafkaClusters = builder.build(); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaClusterFactory.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaClusterFactory.java index 26a9d40647f..357a548a637 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaClusterFactory.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaClusterFactory.java @@ -3,6 +3,7 @@ import com.provectus.kafka.ui.client.RetryingKafkaConnectClient; import com.provectus.kafka.ui.config.ClustersProperties; import com.provectus.kafka.ui.connect.api.KafkaConnectClientApi; +import com.provectus.kafka.ui.emitter.PollingSettings; import com.provectus.kafka.ui.model.ApplicationPropertyValidationDTO; import com.provectus.kafka.ui.model.ClusterConfigValidationDTO; import com.provectus.kafka.ui.model.KafkaCluster; @@ -12,7 +13,6 @@ import com.provectus.kafka.ui.sr.ApiClient; import com.provectus.kafka.ui.sr.api.KafkaSrClientApi; import com.provectus.kafka.ui.util.KafkaServicesValidation; -import com.provectus.kafka.ui.util.PollingThrottler; import com.provectus.kafka.ui.util.ReactiveFailover; import com.provectus.kafka.ui.util.WebClientConfigurator; import java.util.HashMap; @@ -41,7 +41,8 @@ public class KafkaClusterFactory { @Value("${webclient.max-in-memory-buffer-size:20MB}") private DataSize maxBuffSize; - public KafkaCluster create(ClustersProperties.Cluster clusterProperties) { + public KafkaCluster create(ClustersProperties properties, + ClustersProperties.Cluster clusterProperties) { KafkaCluster.KafkaClusterBuilder builder = KafkaCluster.builder(); builder.name(clusterProperties.getName()); @@ -49,7 +50,7 @@ public KafkaCluster create(ClustersProperties.Cluster clusterProperties) { builder.properties(convertProperties(clusterProperties.getProperties())); builder.readOnly(clusterProperties.isReadOnly()); builder.masking(DataMasking.create(clusterProperties.getMasking())); - builder.throttler(PollingThrottler.throttlerSupplier(clusterProperties)); + builder.pollingSettings(PollingSettings.create(clusterProperties, properties)); if (schemaRegistryConfigured(clusterProperties)) { builder.schemaRegistryClient(schemaRegistryClient(clusterProperties)); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java index d1f0e261a8c..0b54c9d7835 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java @@ -17,7 +17,7 @@ import com.provectus.kafka.ui.serde.api.Serde; import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer; import com.provectus.kafka.ui.serdes.ProducerRecordCreator; -import com.provectus.kafka.ui.util.ResultSizeLimiter; +import com.provectus.kafka.ui.emitter.ResultSizeLimiter; import com.provectus.kafka.ui.util.SslPropertiesUtil; import java.util.List; import java.util.Map; @@ -169,7 +169,7 @@ private Flux loadMessagesImpl(KafkaCluster cluster, () -> consumerGroupService.createConsumer(cluster), consumerPosition, recordDeserializer, - cluster.getThrottler().get() + cluster.getPollingSettings() ); } else if (seekDirection.equals(SeekDirectionDTO.BACKWARD)) { emitter = new BackwardRecordEmitter( @@ -177,14 +177,14 @@ private Flux loadMessagesImpl(KafkaCluster cluster, consumerPosition, limit, recordDeserializer, - cluster.getThrottler().get() + cluster.getPollingSettings() ); } else { emitter = new TailingEmitter( () -> consumerGroupService.createConsumer(cluster), consumerPosition, recordDeserializer, - cluster.getThrottler().get() + cluster.getPollingSettings() ); } MessageFilterStats filterStats = new MessageFilterStats(); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/TopicAnalysisService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/TopicAnalysisService.java index 9b6bd787671..47bc1305646 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/TopicAnalysisService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/TopicAnalysisService.java @@ -1,14 +1,14 @@ package com.provectus.kafka.ui.service.analyze; -import static com.provectus.kafka.ui.emitter.AbstractEmitter.NO_MORE_DATA_EMPTY_POLLS_COUNT; - +import com.provectus.kafka.ui.emitter.EmptyPollsCounter; import com.provectus.kafka.ui.emitter.OffsetsInfo; +import com.provectus.kafka.ui.emitter.PollingSettings; import com.provectus.kafka.ui.exception.TopicAnalysisException; import com.provectus.kafka.ui.model.KafkaCluster; import com.provectus.kafka.ui.model.TopicAnalysisDTO; import com.provectus.kafka.ui.service.ConsumerGroupService; import com.provectus.kafka.ui.service.TopicsService; -import com.provectus.kafka.ui.util.PollingThrottler; +import com.provectus.kafka.ui.emitter.PollingThrottler; import java.io.Closeable; import java.time.Duration; import java.time.Instant; @@ -63,7 +63,7 @@ private synchronized void startAnalysis(KafkaCluster cluster, if (analysisTasksStore.isAnalysisInProgress(topicId)) { throw new TopicAnalysisException("Topic is already analyzing"); } - var task = new AnalysisTask(cluster, topicId, partitionsCnt, approxNumberOfMsgs, cluster.getThrottler().get()); + var task = new AnalysisTask(cluster, topicId, partitionsCnt, approxNumberOfMsgs, cluster.getPollingSettings()); analysisTasksStore.registerNewTask(topicId, task); Schedulers.boundedElastic().schedule(task); } @@ -83,6 +83,7 @@ class AnalysisTask implements Runnable, Closeable { private final TopicIdentity topicId; private final int partitionsCnt; private final long approxNumberOfMsgs; + private final EmptyPollsCounter emptyPollsCounter; private final PollingThrottler throttler; private final TopicAnalysisStats totalStats = new TopicAnalysisStats(); @@ -91,7 +92,7 @@ class AnalysisTask implements Runnable, Closeable { private final KafkaConsumer consumer; AnalysisTask(KafkaCluster cluster, TopicIdentity topicId, int partitionsCnt, - long approxNumberOfMsgs, PollingThrottler throttler) { + long approxNumberOfMsgs, PollingSettings pollingSettings) { this.topicId = topicId; this.approxNumberOfMsgs = approxNumberOfMsgs; this.partitionsCnt = partitionsCnt; @@ -103,7 +104,8 @@ class AnalysisTask implements Runnable, Closeable { ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100000" ) ); - this.throttler = throttler; + this.throttler = pollingSettings.getPollingThrottler(); + this.emptyPollsCounter = pollingSettings.createEmptyPollsCounter(); } @Override @@ -124,11 +126,10 @@ public void run() { consumer.seekToBeginning(topicPartitions); var offsetsInfo = new OffsetsInfo(consumer, topicId.topicName); - for (int emptyPolls = 0; !offsetsInfo.assignedPartitionsFullyPolled() - && emptyPolls < NO_MORE_DATA_EMPTY_POLLS_COUNT;) { + while (!offsetsInfo.assignedPartitionsFullyPolled() && !emptyPollsCounter.noDataEmptyCountsReached()) { var polled = consumer.poll(Duration.ofSeconds(3)); throttler.throttleAfterPoll(polled); - emptyPolls = polled.isEmpty() ? emptyPolls + 1 : 0; + emptyPollsCounter.count(polled); polled.forEach(r -> { totalStats.apply(r); partitionStats.get(r.partition()).apply(r); diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/RecordEmitterTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/RecordEmitterTest.java index 79e81a180fc..e7b9edf834d 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/RecordEmitterTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/RecordEmitterTest.java @@ -9,6 +9,7 @@ import com.provectus.kafka.ui.AbstractIntegrationTest; import com.provectus.kafka.ui.emitter.BackwardRecordEmitter; import com.provectus.kafka.ui.emitter.ForwardRecordEmitter; +import com.provectus.kafka.ui.emitter.PollingSettings; import com.provectus.kafka.ui.model.ConsumerPosition; import com.provectus.kafka.ui.model.TopicMessageEventDTO; import com.provectus.kafka.ui.producer.KafkaTestProducer; @@ -16,7 +17,6 @@ import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer; import com.provectus.kafka.ui.serdes.PropertyResolverImpl; import com.provectus.kafka.ui.serdes.builtin.StringSerde; -import com.provectus.kafka.ui.util.PollingThrottler; import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; @@ -112,7 +112,7 @@ void pollNothingOnEmptyTopic() { this::createConsumer, new ConsumerPosition(BEGINNING, EMPTY_TOPIC, null), RECORD_DESERIALIZER, - PollingThrottler.noop() + PollingSettings.createDefault() ); var backwardEmitter = new BackwardRecordEmitter( @@ -120,7 +120,7 @@ void pollNothingOnEmptyTopic() { new ConsumerPosition(BEGINNING, EMPTY_TOPIC, null), 100, RECORD_DESERIALIZER, - PollingThrottler.noop() + PollingSettings.createDefault() ); StepVerifier.create(Flux.create(forwardEmitter)) @@ -142,7 +142,7 @@ void pollFullTopicFromBeginning() { this::createConsumer, new ConsumerPosition(BEGINNING, TOPIC, null), RECORD_DESERIALIZER, - PollingThrottler.noop() + PollingSettings.createDefault() ); var backwardEmitter = new BackwardRecordEmitter( @@ -150,7 +150,7 @@ void pollFullTopicFromBeginning() { new ConsumerPosition(LATEST, TOPIC, null), PARTITIONS * MSGS_PER_PARTITION, RECORD_DESERIALIZER, - PollingThrottler.noop() + PollingSettings.createDefault() ); List expectedValues = SENT_RECORDS.stream().map(Record::getValue).collect(Collectors.toList()); @@ -171,7 +171,7 @@ void pollWithOffsets() { this::createConsumer, new ConsumerPosition(OFFSET, TOPIC, targetOffsets), RECORD_DESERIALIZER, - PollingThrottler.noop() + PollingSettings.createDefault() ); var backwardEmitter = new BackwardRecordEmitter( @@ -179,7 +179,7 @@ void pollWithOffsets() { new ConsumerPosition(OFFSET, TOPIC, targetOffsets), PARTITIONS * MSGS_PER_PARTITION, RECORD_DESERIALIZER, - PollingThrottler.noop() + PollingSettings.createDefault() ); var expectedValues = SENT_RECORDS.stream() @@ -216,7 +216,7 @@ void pollWithTimestamps() { this::createConsumer, new ConsumerPosition(TIMESTAMP, TOPIC, targetTimestamps), RECORD_DESERIALIZER, - PollingThrottler.noop() + PollingSettings.createDefault() ); var backwardEmitter = new BackwardRecordEmitter( @@ -224,7 +224,7 @@ void pollWithTimestamps() { new ConsumerPosition(TIMESTAMP, TOPIC, targetTimestamps), PARTITIONS * MSGS_PER_PARTITION, RECORD_DESERIALIZER, - PollingThrottler.noop() + PollingSettings.createDefault() ); var expectedValues = SENT_RECORDS.stream() @@ -255,7 +255,7 @@ void backwardEmitterSeekToEnd() { new ConsumerPosition(OFFSET, TOPIC, targetOffsets), numMessages, RECORD_DESERIALIZER, - PollingThrottler.noop() + PollingSettings.createDefault() ); var expectedValues = SENT_RECORDS.stream() @@ -281,7 +281,7 @@ void backwardEmitterSeekToBegin() { new ConsumerPosition(OFFSET, TOPIC, offsets), 100, RECORD_DESERIALIZER, - PollingThrottler.noop() + PollingSettings.createDefault() ); expectEmitter(backwardEmitter, diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/util/PollingThrottlerTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/util/PollingThrottlerTest.java index ab333cb11a4..2efe7562df9 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/util/PollingThrottlerTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/util/PollingThrottlerTest.java @@ -5,6 +5,7 @@ import com.google.common.base.Stopwatch; import com.google.common.util.concurrent.RateLimiter; +import com.provectus.kafka.ui.emitter.PollingThrottler; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.Test; From 4b88e28437a1c8450dda1839a6df3aa967c61548 Mon Sep 17 00:00:00 2001 From: iliax Date: Fri, 17 Mar 2023 13:44:42 +0400 Subject: [PATCH 2/4] naming fixes --- .../com/provectus/kafka/ui/config/ClustersProperties.java | 1 - .../com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java | 2 +- .../com/provectus/kafka/ui/emitter/EmptyPollsCounter.java | 2 +- .../com/provectus/kafka/ui/emitter/ForwardRecordEmitter.java | 2 +- .../java/com/provectus/kafka/ui/emitter/PollingSettings.java | 4 ++-- .../kafka/ui/service/analyze/TopicAnalysisService.java | 2 +- 6 files changed, 6 insertions(+), 7 deletions(-) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java index e6dc84ad89b..461fc1b648f 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java @@ -52,7 +52,6 @@ public static class Cluster { @Data public static class PollingProperties { Integer pollTimeoutMs; - Integer topicPollTimeoutMs; Integer partitionPollTimeout; Integer noDataEmptyPolls; } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java index c4f6bac24b9..42f94a1e019 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java @@ -107,7 +107,7 @@ private List> partitionPollIteration( EmptyPollsCounter emptyPolls = pollingSettings.createEmptyPollsCounter(); while (!sink.isCancelled() && recordsToSend.size() < desiredMsgsToPoll - && !emptyPolls.noDataEmptyCountsReached()) { + && !emptyPolls.noDataEmptyPollsReached()) { var polledRecords = poll(sink, consumer, pollingSettings.getPartitionPollTimeout()); emptyPolls.count(polledRecords); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/EmptyPollsCounter.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/EmptyPollsCounter.java index 13ab71e5944..3bc2ca38c1d 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/EmptyPollsCounter.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/EmptyPollsCounter.java @@ -21,7 +21,7 @@ public void count(ConsumerRecords polled) { emptyPolls = polled.isEmpty() ? emptyPolls + 1 : 0; } - public boolean noDataEmptyCountsReached() { + public boolean noDataEmptyPollsReached() { return emptyPolls >= maxEmptyPolls; } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardRecordEmitter.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardRecordEmitter.java index 26b297150c7..971e2f7c9c4 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardRecordEmitter.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardRecordEmitter.java @@ -41,7 +41,7 @@ public void accept(FluxSink sink) { EmptyPollsCounter emptyPolls = pollingSettings.createEmptyPollsCounter(); while (!sink.isCancelled() && !seekOperations.assignedPartitionsFullyPolled() - && !emptyPolls.noDataEmptyCountsReached()) { + && !emptyPolls.noDataEmptyPollsReached()) { sendPhase(sink, "Polling"); ConsumerRecords records = poll(sink, consumer); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/PollingSettings.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/PollingSettings.java index 10fdb6e5b21..0c3dfcfbab4 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/PollingSettings.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/PollingSettings.java @@ -30,14 +30,14 @@ public static PollingSettings create(ClustersProperties.Cluster cluster, ? Duration.ofMillis(pollingProps.getPartitionPollTimeout()) : Duration.ofMillis(pollTimeout.toMillis() / 5); - int notDataEmptyPolls = pollingProps.getNoDataEmptyPolls() != null + int noDataEmptyPolls = pollingProps.getNoDataEmptyPolls() != null ? pollingProps.getNoDataEmptyPolls() : DEFAULT_NO_DATA_EMPTY_POLLS; return new PollingSettings( pollTimeout, partitionPollTimeout, - notDataEmptyPolls, + noDataEmptyPolls, PollingThrottler.throttlerSupplier(cluster) ); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/TopicAnalysisService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/TopicAnalysisService.java index 47bc1305646..3465dc3554b 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/TopicAnalysisService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/TopicAnalysisService.java @@ -126,7 +126,7 @@ public void run() { consumer.seekToBeginning(topicPartitions); var offsetsInfo = new OffsetsInfo(consumer, topicId.topicName); - while (!offsetsInfo.assignedPartitionsFullyPolled() && !emptyPollsCounter.noDataEmptyCountsReached()) { + while (!offsetsInfo.assignedPartitionsFullyPolled() && !emptyPollsCounter.noDataEmptyPollsReached()) { var polled = consumer.poll(Duration.ofSeconds(3)); throttler.throttleAfterPoll(polled); emptyPollsCounter.count(polled); From 541940995e879520abeecdd50bcfd12fd2453360 Mon Sep 17 00:00:00 2001 From: iliax Date: Fri, 17 Mar 2023 13:46:28 +0400 Subject: [PATCH 3/4] checkstyle fixes --- .../java/com/provectus/kafka/ui/service/MessagesService.java | 2 +- .../kafka/ui/service/analyze/TopicAnalysisService.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java index 0b54c9d7835..27f751ac80a 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java @@ -5,6 +5,7 @@ import com.provectus.kafka.ui.emitter.ForwardRecordEmitter; import com.provectus.kafka.ui.emitter.MessageFilterStats; import com.provectus.kafka.ui.emitter.MessageFilters; +import com.provectus.kafka.ui.emitter.ResultSizeLimiter; import com.provectus.kafka.ui.emitter.TailingEmitter; import com.provectus.kafka.ui.exception.TopicNotFoundException; import com.provectus.kafka.ui.exception.ValidationException; @@ -17,7 +18,6 @@ import com.provectus.kafka.ui.serde.api.Serde; import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer; import com.provectus.kafka.ui.serdes.ProducerRecordCreator; -import com.provectus.kafka.ui.emitter.ResultSizeLimiter; import com.provectus.kafka.ui.util.SslPropertiesUtil; import java.util.List; import java.util.Map; diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/TopicAnalysisService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/TopicAnalysisService.java index 3465dc3554b..7ea7a165983 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/TopicAnalysisService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/TopicAnalysisService.java @@ -3,12 +3,12 @@ import com.provectus.kafka.ui.emitter.EmptyPollsCounter; import com.provectus.kafka.ui.emitter.OffsetsInfo; import com.provectus.kafka.ui.emitter.PollingSettings; +import com.provectus.kafka.ui.emitter.PollingThrottler; import com.provectus.kafka.ui.exception.TopicAnalysisException; import com.provectus.kafka.ui.model.KafkaCluster; import com.provectus.kafka.ui.model.TopicAnalysisDTO; import com.provectus.kafka.ui.service.ConsumerGroupService; import com.provectus.kafka.ui.service.TopicsService; -import com.provectus.kafka.ui.emitter.PollingThrottler; import java.io.Closeable; import java.time.Duration; import java.time.Instant; From 6f94c2776918a9d42f5e069f8f888b43e9242fd7 Mon Sep 17 00:00:00 2001 From: iliax Date: Mon, 20 Mar 2023 15:37:40 +0400 Subject: [PATCH 4/4] polling properties added to wizard api --- .../src/main/resources/swagger/kafka-ui-api.yaml | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index 71c595e5251..ea335f282cf 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -3445,6 +3445,15 @@ components: kafka: type: object properties: + polling: + type: object + properties: + pollTimeoutMs: + type: integer + partitionPollTimeout: + type: integer + noDataEmptyPolls: + type: integer clusters: type: array items: