From f15f18678975245d35530527632a03134385ba91 Mon Sep 17 00:00:00 2001 From: cprasad1 Date: Mon, 4 Oct 2021 18:43:33 -0700 Subject: [PATCH] fix: Always dec. concurrency counter, even hitting rate limits (#8165) (#8223) * fix: Always dec. concurrency counter, even hitting rate limits (#8165) * fix: Always dec. concurrency counter, even hitting rate limits * chore: fix pull query compilation Co-authored-by: Alan Sheinberg <57688982+AlanConfluent@users.noreply.github.com> --- .../ksql/physical/pull/PullQueryResult.java | 32 ++++++++++++++----- .../ksql/api/impl/QueryEndpoint.java | 16 ++++++---- .../api/server/SlidingWindowRateLimiter.java | 5 +++ .../streaming/PullQueryPublisher.java | 16 ++++++---- .../streaming/StreamedQueryResource.java | 30 +++++++---------- 5 files changed, 60 insertions(+), 39 deletions(-) diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/pull/PullQueryResult.java b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/pull/PullQueryResult.java index bc665e1e5892..8c57c90825c0 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/pull/PullQueryResult.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/pull/PullQueryResult.java @@ -29,8 +29,11 @@ import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Supplier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class PullQueryResult { + private static final Logger LOG = LoggerFactory.getLogger(PullQueryResult.class); private final LogicalSchema schema; private final PullQueryQueuePopulator populator; @@ -87,22 +90,35 @@ public PullQueryQueue getPullQueryQueue() { public void start() { Preconditions.checkState(!started, "Should only start once"); started = true; - final CompletableFuture f = populator.run(); - f.exceptionally(t -> { + try { + final CompletableFuture f = populator.run(); + f.exceptionally(t -> { + future.completeExceptionally(t); + return null; + }); + f.thenAccept(future::complete); + } catch (final Throwable t) { future.completeExceptionally(t); - return null; - }); - f.thenAccept(future::complete); + throw t; + } + // Register the error metric + onException(t -> + pullQueryMetrics.ifPresent(metrics -> + metrics.recordErrorRate(1, sourceType, planType, routingNodeType)) + ); } public void stop() { - pullQueryQueue.close(); + try { + pullQueryQueue.close(); + } catch (final Throwable t) { + LOG.error("Error closing pull query queue", t); + } + future.complete(null); } public void onException(final Consumer consumer) { future.exceptionally(t -> { - pullQueryMetrics.ifPresent(metrics -> - metrics.recordErrorRate(1, sourceType, planType, routingNodeType)); consumer.accept(t); return null; }); diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/QueryEndpoint.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/QueryEndpoint.java index e5b6a768ab04..44bffea53291 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/QueryEndpoint.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/QueryEndpoint.java @@ -241,11 +241,13 @@ private QueryPublisher createPullQueryPublisher( statement.getSessionConfig().getOverrides() ); - PullQueryExecutionUtil.checkRateLimit(rateLimiter); - final Decrementer decrementer = pullConcurrencyLimiter.increment(); - pullBandRateLimiter.allow(); - + Decrementer decrementer = null; try { + PullQueryExecutionUtil.checkRateLimit(rateLimiter); + decrementer = pullConcurrencyLimiter.increment(); + pullBandRateLimiter.allow(); + final Decrementer finalDecrementer = decrementer; + final PullQueryResult result = ksqlEngine.executePullQuery( serviceContext, statement, @@ -258,7 +260,7 @@ private QueryPublisher createPullQueryPublisher( resultForMetrics.set(result); result.onCompletionOrException((v, throwable) -> { - decrementer.decrementAtMostOnce(); + finalDecrementer.decrementAtMostOnce(); }); final BlockingQueryPublisher publisher = new BlockingQueryPublisher(context, workerExecutor); @@ -267,7 +269,9 @@ private QueryPublisher createPullQueryPublisher( return publisher; } catch (Throwable t) { - decrementer.decrementAtMostOnce(); + if (decrementer != null) { + decrementer.decrementAtMostOnce(); + } throw t; } } diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/SlidingWindowRateLimiter.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/SlidingWindowRateLimiter.java index 7d049abb63bf..9a315cef0132 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/SlidingWindowRateLimiter.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/SlidingWindowRateLimiter.java @@ -23,6 +23,8 @@ import java.util.LinkedList; import java.util.Queue; import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * SlidingWindowRateLimiter keeps a log of timestamps and the size for each response returned by @@ -36,6 +38,7 @@ public class SlidingWindowRateLimiter { + private static final Logger LOG = LoggerFactory.getLogger(SlidingWindowRateLimiter.class); private static long NUM_BYTES_IN_ONE_MEGABYTE = 1 * 1024 * 1024; /** @@ -90,6 +93,8 @@ protected synchronized void allow(final long timestamp) throws KsqlException { this.numBytesInWindow -= responseSizesLog.poll().right; } if (this.numBytesInWindow > throttleLimit) { + LOG.warn("Hit bandwidth rate limit of " + throttleLimit + "MB with use of " + + numBytesInWindow + "MB"); throw new KsqlException("Host is at bandwidth rate limit for pull queries."); } } diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisher.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisher.java index b8753c984067..a8c8181d6ffb 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisher.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisher.java @@ -103,12 +103,14 @@ public synchronized void subscribe(final Subscriber> sub query.getSessionConfig().getOverrides() ); - PullQueryExecutionUtil.checkRateLimit(rateLimiter); - final Decrementer decrementer = concurrencyLimiter.increment(); - pullBandRateLimiter.allow(); - PullQueryResult result = null; + Decrementer decrementer = null; try { + PullQueryExecutionUtil.checkRateLimit(rateLimiter); + decrementer = concurrencyLimiter.increment(); + pullBandRateLimiter.allow(); + final Decrementer finalDecrementer = decrementer; + result = ksqlEngine.executePullQuery( serviceContext, query, @@ -121,7 +123,7 @@ public synchronized void subscribe(final Subscriber> sub final PullQueryResult finalResult = result; result.onCompletionOrException((v, throwable) -> { - decrementer.decrementAtMostOnce(); + finalDecrementer.decrementAtMostOnce(); pullQueryMetrics.ifPresent(m -> { recordMetrics(m, Optional.of(finalResult)); @@ -133,7 +135,9 @@ public synchronized void subscribe(final Subscriber> sub subscriber.onSubscribe(subscription); } catch (Throwable t) { - decrementer.decrementAtMostOnce(); + if (decrementer != null) { + decrementer.decrementAtMostOnce(); + } if (result == null) { pullQueryMetrics.ifPresent(m -> recordMetrics(m, Optional.empty())); diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java index 75c3de3c177d..a4c804cabc18 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java @@ -15,7 +15,6 @@ package io.confluent.ksql.rest.server.resources.streaming; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.RateLimiter; @@ -404,15 +403,14 @@ private EndpointResponse handlePullQuery( // Only check the rate limit at the forwarding host Decrementer decrementer = null; - if (!isAlreadyForwarded) { - PullQueryExecutionUtil.checkRateLimit(rateLimiter); - decrementer = concurrencyLimiter.increment(); - } - pullBandRateLimiter.allow(); - - final Optional optionalDecrementer = Optional.ofNullable(decrementer); - try { + if (!isAlreadyForwarded) { + PullQueryExecutionUtil.checkRateLimit(rateLimiter); + decrementer = concurrencyLimiter.increment(); + } + pullBandRateLimiter.allow(); + + final Optional optionalDecrementer = Optional.ofNullable(decrementer); final PullQueryResult result = ksqlEngine.executePullQuery( serviceContext, configured, @@ -436,8 +434,10 @@ private EndpointResponse handlePullQuery( connectionClosedFuture); return EndpointResponse.ok(pullQueryStreamWriter); - } catch (Throwable t) { - optionalDecrementer.ifPresent(Decrementer::decrementAtMostOnce); + } catch (final Throwable t) { + if (decrementer != null) { + decrementer.decrementAtMostOnce(); + } throw t; } } @@ -509,14 +509,6 @@ private EndpointResponse handlePushQuery( return EndpointResponse.ok(queryStreamWriter); } - private static String writeValueAsString(final Object object) { - try { - return OBJECT_MAPPER.writeValueAsString(object); - } catch (final JsonProcessingException e) { - throw new RuntimeException(e); - } - } - private EndpointResponse handlePrintTopic( final ServiceContext serviceContext, final Map streamProperties,