Skip to content

Commit

Permalink
fix: Always dec. concurrency counter, even hitting rate limits (#8165) (
Browse files Browse the repository at this point in the history
#8223)

* fix: Always dec. concurrency counter, even hitting rate limits (#8165)

* fix: Always dec. concurrency counter, even hitting rate limits

* chore: fix pull query compilation

Co-authored-by: Alan Sheinberg <[email protected]>
  • Loading branch information
cprasad1 and AlanConfluent authored Oct 5, 2021
1 parent e23b75c commit f15f186
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,11 @@
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PullQueryResult {
private static final Logger LOG = LoggerFactory.getLogger(PullQueryResult.class);

private final LogicalSchema schema;
private final PullQueryQueuePopulator populator;
Expand Down Expand Up @@ -87,22 +90,35 @@ public PullQueryQueue getPullQueryQueue() {
public void start() {
Preconditions.checkState(!started, "Should only start once");
started = true;
final CompletableFuture<Void> f = populator.run();
f.exceptionally(t -> {
try {
final CompletableFuture<Void> f = populator.run();
f.exceptionally(t -> {
future.completeExceptionally(t);
return null;
});
f.thenAccept(future::complete);
} catch (final Throwable t) {
future.completeExceptionally(t);
return null;
});
f.thenAccept(future::complete);
throw t;
}
// Register the error metric
onException(t ->
pullQueryMetrics.ifPresent(metrics ->
metrics.recordErrorRate(1, sourceType, planType, routingNodeType))
);
}

public void stop() {
pullQueryQueue.close();
try {
pullQueryQueue.close();
} catch (final Throwable t) {
LOG.error("Error closing pull query queue", t);
}
future.complete(null);
}

public void onException(final Consumer<Throwable> consumer) {
future.exceptionally(t -> {
pullQueryMetrics.ifPresent(metrics ->
metrics.recordErrorRate(1, sourceType, planType, routingNodeType));
consumer.accept(t);
return null;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,11 +241,13 @@ private QueryPublisher createPullQueryPublisher(
statement.getSessionConfig().getOverrides()
);

PullQueryExecutionUtil.checkRateLimit(rateLimiter);
final Decrementer decrementer = pullConcurrencyLimiter.increment();
pullBandRateLimiter.allow();

Decrementer decrementer = null;
try {
PullQueryExecutionUtil.checkRateLimit(rateLimiter);
decrementer = pullConcurrencyLimiter.increment();
pullBandRateLimiter.allow();
final Decrementer finalDecrementer = decrementer;

final PullQueryResult result = ksqlEngine.executePullQuery(
serviceContext,
statement,
Expand All @@ -258,7 +260,7 @@ private QueryPublisher createPullQueryPublisher(

resultForMetrics.set(result);
result.onCompletionOrException((v, throwable) -> {
decrementer.decrementAtMostOnce();
finalDecrementer.decrementAtMostOnce();
});

final BlockingQueryPublisher publisher = new BlockingQueryPublisher(context, workerExecutor);
Expand All @@ -267,7 +269,9 @@ private QueryPublisher createPullQueryPublisher(

return publisher;
} catch (Throwable t) {
decrementer.decrementAtMostOnce();
if (decrementer != null) {
decrementer.decrementAtMostOnce();
}
throw t;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.util.LinkedList;
import java.util.Queue;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* SlidingWindowRateLimiter keeps a log of timestamps and the size for each response returned by
Expand All @@ -36,6 +38,7 @@

public class SlidingWindowRateLimiter {

private static final Logger LOG = LoggerFactory.getLogger(SlidingWindowRateLimiter.class);
private static long NUM_BYTES_IN_ONE_MEGABYTE = 1 * 1024 * 1024;

/**
Expand Down Expand Up @@ -90,6 +93,8 @@ protected synchronized void allow(final long timestamp) throws KsqlException {
this.numBytesInWindow -= responseSizesLog.poll().right;
}
if (this.numBytesInWindow > throttleLimit) {
LOG.warn("Hit bandwidth rate limit of " + throttleLimit + "MB with use of "
+ numBytesInWindow + "MB");
throw new KsqlException("Host is at bandwidth rate limit for pull queries.");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,14 @@ public synchronized void subscribe(final Subscriber<Collection<StreamedRow>> sub
query.getSessionConfig().getOverrides()
);

PullQueryExecutionUtil.checkRateLimit(rateLimiter);
final Decrementer decrementer = concurrencyLimiter.increment();
pullBandRateLimiter.allow();

PullQueryResult result = null;
Decrementer decrementer = null;
try {
PullQueryExecutionUtil.checkRateLimit(rateLimiter);
decrementer = concurrencyLimiter.increment();
pullBandRateLimiter.allow();
final Decrementer finalDecrementer = decrementer;

result = ksqlEngine.executePullQuery(
serviceContext,
query,
Expand All @@ -121,7 +123,7 @@ public synchronized void subscribe(final Subscriber<Collection<StreamedRow>> sub

final PullQueryResult finalResult = result;
result.onCompletionOrException((v, throwable) -> {
decrementer.decrementAtMostOnce();
finalDecrementer.decrementAtMostOnce();

pullQueryMetrics.ifPresent(m -> {
recordMetrics(m, Optional.of(finalResult));
Expand All @@ -133,7 +135,9 @@ public synchronized void subscribe(final Subscriber<Collection<StreamedRow>> sub

subscriber.onSubscribe(subscription);
} catch (Throwable t) {
decrementer.decrementAtMostOnce();
if (decrementer != null) {
decrementer.decrementAtMostOnce();
}

if (result == null) {
pullQueryMetrics.ifPresent(m -> recordMetrics(m, Optional.empty()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

package io.confluent.ksql.rest.server.resources.streaming;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.RateLimiter;
Expand Down Expand Up @@ -404,15 +403,14 @@ private EndpointResponse handlePullQuery(

// Only check the rate limit at the forwarding host
Decrementer decrementer = null;
if (!isAlreadyForwarded) {
PullQueryExecutionUtil.checkRateLimit(rateLimiter);
decrementer = concurrencyLimiter.increment();
}
pullBandRateLimiter.allow();

final Optional<Decrementer> optionalDecrementer = Optional.ofNullable(decrementer);

try {
if (!isAlreadyForwarded) {
PullQueryExecutionUtil.checkRateLimit(rateLimiter);
decrementer = concurrencyLimiter.increment();
}
pullBandRateLimiter.allow();

final Optional<Decrementer> optionalDecrementer = Optional.ofNullable(decrementer);
final PullQueryResult result = ksqlEngine.executePullQuery(
serviceContext,
configured,
Expand All @@ -436,8 +434,10 @@ private EndpointResponse handlePullQuery(
connectionClosedFuture);

return EndpointResponse.ok(pullQueryStreamWriter);
} catch (Throwable t) {
optionalDecrementer.ifPresent(Decrementer::decrementAtMostOnce);
} catch (final Throwable t) {
if (decrementer != null) {
decrementer.decrementAtMostOnce();
}
throw t;
}
}
Expand Down Expand Up @@ -509,14 +509,6 @@ private EndpointResponse handlePushQuery(
return EndpointResponse.ok(queryStreamWriter);
}

private static String writeValueAsString(final Object object) {
try {
return OBJECT_MAPPER.writeValueAsString(object);
} catch (final JsonProcessingException e) {
throw new RuntimeException(e);
}
}

private EndpointResponse handlePrintTopic(
final ServiceContext serviceContext,
final Map<String, Object> streamProperties,
Expand Down

0 comments on commit f15f186

Please sign in to comment.