Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Polling timeouts made configurable #3513

Merged
merged 5 commits into from
Mar 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ public class ClustersProperties {

String internalTopicPrefix;

PollingProperties polling = new PollingProperties();

@Data
public static class Cluster {
String name;
Expand All @@ -49,6 +51,13 @@ public static class Cluster {
TruststoreConfig ssl;
}

@Data
public static class PollingProperties {
Integer pollTimeoutMs;
Integer partitionPollTimeout;
Integer noDataEmptyPolls;
}

@Data
@ToString(exclude = "password")
public static class MetricsConfigData {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import com.provectus.kafka.ui.model.TopicMessagePhaseDTO;
import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
import com.provectus.kafka.ui.util.PollingThrottler;
import java.time.Duration;
import java.time.Instant;
import org.apache.kafka.clients.consumer.Consumer;
Expand All @@ -14,27 +13,21 @@
import reactor.core.publisher.FluxSink;

public abstract class AbstractEmitter {
private static final Duration DEFAULT_POLL_TIMEOUT_MS = Duration.ofMillis(1000L);

// In some situations it is hard to say whether records range (between two offsets) was fully polled.
// This happens when we have holes in records sequences that is usual case for compact topics or
// topics with transactional writes. In such cases if you want to poll all records between offsets X and Y
// there is no guarantee that you will ever see record with offset Y.
// To workaround this we can assume that after N consecutive empty polls all target messages were read.
public static final int NO_MORE_DATA_EMPTY_POLLS_COUNT = 3;

private final ConsumerRecordDeserializer recordDeserializer;
private final ConsumingStats consumingStats = new ConsumingStats();
private final PollingThrottler throttler;
protected final PollingSettings pollingSettings;

protected AbstractEmitter(ConsumerRecordDeserializer recordDeserializer, PollingThrottler throttler) {
protected AbstractEmitter(ConsumerRecordDeserializer recordDeserializer, PollingSettings pollingSettings) {
this.recordDeserializer = recordDeserializer;
this.throttler = throttler;
this.pollingSettings = pollingSettings;
this.throttler = pollingSettings.getPollingThrottler();
}

protected ConsumerRecords<Bytes, Bytes> poll(
FluxSink<TopicMessageEventDTO> sink, Consumer<Bytes, Bytes> consumer) {
return poll(sink, consumer, DEFAULT_POLL_TIMEOUT_MS);
return poll(sink, consumer, pollingSettings.getPollTimeout());
}

protected ConsumerRecords<Bytes, Bytes> poll(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,12 @@
import com.provectus.kafka.ui.model.ConsumerPosition;
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
import com.provectus.kafka.ui.util.PollingThrottler;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.TreeMap;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand All @@ -26,8 +23,6 @@ public class BackwardRecordEmitter
extends AbstractEmitter
implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {

private static final Duration POLL_TIMEOUT = Duration.ofMillis(200);

private final Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier;
private final ConsumerPosition consumerPosition;
private final int messagesPerPage;
Expand All @@ -37,8 +32,8 @@ public BackwardRecordEmitter(
ConsumerPosition consumerPosition,
int messagesPerPage,
ConsumerRecordDeserializer recordDeserializer,
PollingThrottler throttler) {
super(recordDeserializer, throttler);
PollingSettings pollingSettings) {
super(recordDeserializer, pollingSettings);
this.consumerPosition = consumerPosition;
this.messagesPerPage = messagesPerPage;
this.consumerSupplier = consumerSupplier;
Expand Down Expand Up @@ -109,17 +104,18 @@ private List<ConsumerRecord<Bytes, Bytes>> partitionPollIteration(

var recordsToSend = new ArrayList<ConsumerRecord<Bytes, Bytes>>();

// we use empty polls counting to verify that partition was fully read
for (int emptyPolls = 0; recordsToSend.size() < desiredMsgsToPoll && emptyPolls < NO_MORE_DATA_EMPTY_POLLS_COUNT;) {
var polledRecords = poll(sink, consumer, POLL_TIMEOUT);
log.debug("{} records polled from {}", polledRecords.count(), tp);
EmptyPollsCounter emptyPolls = pollingSettings.createEmptyPollsCounter();
while (!sink.isCancelled()
&& recordsToSend.size() < desiredMsgsToPoll
&& !emptyPolls.noDataEmptyPollsReached()) {
var polledRecords = poll(sink, consumer, pollingSettings.getPartitionPollTimeout());
emptyPolls.count(polledRecords);

// counting sequential empty polls
emptyPolls = polledRecords.isEmpty() ? emptyPolls + 1 : 0;
log.debug("{} records polled from {}", polledRecords.count(), tp);

var filteredRecords = polledRecords.records(tp).stream()
.filter(r -> r.offset() < toOffset)
.collect(Collectors.toList());
.toList();

if (!polledRecords.isEmpty() && filteredRecords.isEmpty()) {
// we already read all messages in target offsets interval
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.provectus.kafka.ui.emitter;

import org.apache.kafka.clients.consumer.ConsumerRecords;

// In some situations it is hard to say whether records range (between two offsets) was fully polled.
// This happens when we have holes in records sequences that is usual case for compact topics or
// topics with transactional writes. In such cases if you want to poll all records between offsets X and Y
// there is no guarantee that you will ever see record with offset Y.
// To workaround this we can assume that after N consecutive empty polls all target messages were read.
public class EmptyPollsCounter {

private final int maxEmptyPolls;

private int emptyPolls = 0;

EmptyPollsCounter(int maxEmptyPolls) {
this.maxEmptyPolls = maxEmptyPolls;
}

public void count(ConsumerRecords<?, ?> polled) {
emptyPolls = polled.isEmpty() ? emptyPolls + 1 : 0;
}

public boolean noDataEmptyPollsReached() {
return emptyPolls >= maxEmptyPolls;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import com.provectus.kafka.ui.model.ConsumerPosition;
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
import com.provectus.kafka.ui.util.PollingThrottler;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand All @@ -25,8 +24,8 @@ public ForwardRecordEmitter(
Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier,
ConsumerPosition position,
ConsumerRecordDeserializer recordDeserializer,
PollingThrottler throttler) {
super(recordDeserializer, throttler);
PollingSettings pollingSettings) {
super(recordDeserializer, pollingSettings);
this.position = position;
this.consumerSupplier = consumerSupplier;
}
Expand All @@ -39,16 +38,16 @@ public void accept(FluxSink<TopicMessageEventDTO> sink) {
var seekOperations = SeekOperations.create(consumer, position);
seekOperations.assignAndSeekNonEmptyPartitions();

// we use empty polls counting to verify that topic was fully read
int emptyPolls = 0;
EmptyPollsCounter emptyPolls = pollingSettings.createEmptyPollsCounter();
while (!sink.isCancelled()
&& !seekOperations.assignedPartitionsFullyPolled()
&& emptyPolls < NO_MORE_DATA_EMPTY_POLLS_COUNT) {
&& !emptyPolls.noDataEmptyPollsReached()) {

sendPhase(sink, "Polling");
ConsumerRecords<Bytes, Bytes> records = poll(sink, consumer);
emptyPolls.count(records);

log.debug("{} records polled", records.count());
emptyPolls = records.isEmpty() ? emptyPolls + 1 : 0;

for (ConsumerRecord<Bytes, Bytes> msg : records) {
if (!sink.isCancelled()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package com.provectus.kafka.ui.emitter;

import com.provectus.kafka.ui.config.ClustersProperties;
import java.time.Duration;
import java.util.Optional;
import java.util.function.Supplier;

public class PollingSettings {

private static final Duration DEFAULT_POLL_TIMEOUT = Duration.ofMillis(1_000);
private static final Duration DEFAULT_PARTITION_POLL_TIMEOUT = Duration.ofMillis(200);
private static final int DEFAULT_NO_DATA_EMPTY_POLLS = 3;

private final Duration pollTimeout;
private final Duration partitionPollTimeout;
private final int notDataEmptyPolls; //see EmptyPollsCounter docs

private final Supplier<PollingThrottler> throttlerSupplier;

public static PollingSettings create(ClustersProperties.Cluster cluster,
ClustersProperties clustersProperties) {
var pollingProps = Optional.ofNullable(clustersProperties.getPolling())
.orElseGet(ClustersProperties.PollingProperties::new);

var pollTimeout = pollingProps.getPollTimeoutMs() != null
? Duration.ofMillis(pollingProps.getPollTimeoutMs())
: DEFAULT_POLL_TIMEOUT;

var partitionPollTimeout = pollingProps.getPartitionPollTimeout() != null
? Duration.ofMillis(pollingProps.getPartitionPollTimeout())
: Duration.ofMillis(pollTimeout.toMillis() / 5);

int noDataEmptyPolls = pollingProps.getNoDataEmptyPolls() != null
? pollingProps.getNoDataEmptyPolls()
: DEFAULT_NO_DATA_EMPTY_POLLS;

return new PollingSettings(
pollTimeout,
partitionPollTimeout,
noDataEmptyPolls,
PollingThrottler.throttlerSupplier(cluster)
);
}

public static PollingSettings createDefault() {
return new PollingSettings(
DEFAULT_POLL_TIMEOUT,
DEFAULT_PARTITION_POLL_TIMEOUT,
DEFAULT_NO_DATA_EMPTY_POLLS,
PollingThrottler::noop
);
}

private PollingSettings(Duration pollTimeout,
Duration partitionPollTimeout,
int notDataEmptyPolls,
Supplier<PollingThrottler> throttlerSupplier) {
this.pollTimeout = pollTimeout;
this.partitionPollTimeout = partitionPollTimeout;
this.notDataEmptyPolls = notDataEmptyPolls;
this.throttlerSupplier = throttlerSupplier;
}

public EmptyPollsCounter createEmptyPollsCounter() {
return new EmptyPollsCounter(notDataEmptyPolls);
}

public Duration getPollTimeout() {
return pollTimeout;
}

public Duration getPartitionPollTimeout() {
return partitionPollTimeout;
}

public PollingThrottler getPollingThrottler() {
return throttlerSupplier.get();
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package com.provectus.kafka.ui.util;
package com.provectus.kafka.ui.emitter;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.RateLimiter;
import com.provectus.kafka.ui.config.ClustersProperties;
import com.provectus.kafka.ui.util.ConsumerRecordsUtil;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecords;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.provectus.kafka.ui.util;
package com.provectus.kafka.ui.emitter;

import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import com.provectus.kafka.ui.model.ConsumerPosition;
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
import com.provectus.kafka.ui.util.PollingThrottler;
import java.util.HashMap;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -22,8 +21,8 @@ public class TailingEmitter extends AbstractEmitter
public TailingEmitter(Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier,
ConsumerPosition consumerPosition,
ConsumerRecordDeserializer recordDeserializer,
PollingThrottler throttler) {
super(recordDeserializer, throttler);
PollingSettings pollingSettings) {
super(recordDeserializer, pollingSettings);
this.consumerSupplier = consumerSupplier;
this.consumerPosition = consumerPosition;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@

import com.provectus.kafka.ui.config.ClustersProperties;
import com.provectus.kafka.ui.connect.api.KafkaConnectClientApi;
import com.provectus.kafka.ui.emitter.PollingSettings;
import com.provectus.kafka.ui.service.ksql.KsqlApiClient;
import com.provectus.kafka.ui.service.masking.DataMasking;
import com.provectus.kafka.ui.sr.api.KafkaSrClientApi;
import com.provectus.kafka.ui.util.PollingThrottler;
import com.provectus.kafka.ui.util.ReactiveFailover;
import java.util.Map;
import java.util.Properties;
import java.util.function.Supplier;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Builder;
Expand All @@ -28,7 +27,7 @@ public class KafkaCluster {
private final boolean readOnly;
private final MetricsConfig metricsConfig;
private final DataMasking masking;
private final Supplier<PollingThrottler> throttler;
private final PollingSettings pollingSettings;
private final ReactiveFailover<KafkaSrClientApi> schemaRegistryClient;
private final Map<String, ReactiveFailover<KafkaConnectClientApi>> connectsClients;
private final ReactiveFailover<KsqlApiClient> ksqlClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public class ClustersStorage {

public ClustersStorage(ClustersProperties properties, KafkaClusterFactory factory) {
var builder = ImmutableMap.<String, KafkaCluster>builder();
properties.getClusters().forEach(c -> builder.put(c.getName(), factory.create(c)));
properties.getClusters().forEach(c -> builder.put(c.getName(), factory.create(properties, c)));
this.kafkaClusters = builder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.provectus.kafka.ui.client.RetryingKafkaConnectClient;
import com.provectus.kafka.ui.config.ClustersProperties;
import com.provectus.kafka.ui.connect.api.KafkaConnectClientApi;
import com.provectus.kafka.ui.emitter.PollingSettings;
import com.provectus.kafka.ui.model.ApplicationPropertyValidationDTO;
import com.provectus.kafka.ui.model.ClusterConfigValidationDTO;
import com.provectus.kafka.ui.model.KafkaCluster;
Expand All @@ -12,7 +13,6 @@
import com.provectus.kafka.ui.sr.ApiClient;
import com.provectus.kafka.ui.sr.api.KafkaSrClientApi;
import com.provectus.kafka.ui.util.KafkaServicesValidation;
import com.provectus.kafka.ui.util.PollingThrottler;
import com.provectus.kafka.ui.util.ReactiveFailover;
import com.provectus.kafka.ui.util.WebClientConfigurator;
import java.util.HashMap;
Expand Down Expand Up @@ -41,15 +41,16 @@ public class KafkaClusterFactory {
@Value("${webclient.max-in-memory-buffer-size:20MB}")
private DataSize maxBuffSize;

public KafkaCluster create(ClustersProperties.Cluster clusterProperties) {
public KafkaCluster create(ClustersProperties properties,
ClustersProperties.Cluster clusterProperties) {
KafkaCluster.KafkaClusterBuilder builder = KafkaCluster.builder();

builder.name(clusterProperties.getName());
builder.bootstrapServers(clusterProperties.getBootstrapServers());
builder.properties(convertProperties(clusterProperties.getProperties()));
builder.readOnly(clusterProperties.isReadOnly());
builder.masking(DataMasking.create(clusterProperties.getMasking()));
builder.throttler(PollingThrottler.throttlerSupplier(clusterProperties));
builder.pollingSettings(PollingSettings.create(clusterProperties, properties));

if (schemaRegistryConfigured(clusterProperties)) {
builder.schemaRegistryClient(schemaRegistryClient(clusterProperties));
Expand Down
Loading