From 704b30b0b5d1172e86b8b3a360b5d1d526f0307e Mon Sep 17 00:00:00 2001 From: Alan Sheinberg Date: Thu, 23 Sep 2021 13:09:34 -0700 Subject: [PATCH] fix: Always dec. concurrency counter, even hitting rate limits --- .../ksql/physical/pull/PullQueryResult.java | 25 ++++++++++++---- .../ksql/api/impl/QueryEndpoint.java | 16 ++++++---- .../streaming/PullQueryPublisher.java | 16 ++++++---- .../streaming/StreamedQueryResource.java | 30 ++++++++----------- 4 files changed, 52 insertions(+), 35 deletions(-) diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/pull/PullQueryResult.java b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/pull/PullQueryResult.java index bc665e1e5892..6e668b29c9c9 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/pull/PullQueryResult.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/pull/PullQueryResult.java @@ -29,8 +29,11 @@ import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Supplier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class PullQueryResult { + private static final Logger LOG = LoggerFactory.getLogger(PullQueryResult.class); private final LogicalSchema schema; private final PullQueryQueuePopulator populator; @@ -87,16 +90,26 @@ public PullQueryQueue getPullQueryQueue() { public void start() { Preconditions.checkState(!started, "Should only start once"); started = true; - final CompletableFuture f = populator.run(); - f.exceptionally(t -> { + try { + final CompletableFuture f = populator.run(); + f.exceptionally(t -> { + future.completeExceptionally(t); + return null; + }); + f.thenAccept(future::complete); + } catch (final Throwable t) { future.completeExceptionally(t); - return null; - }); - f.thenAccept(future::complete); + throw t; + } } public void stop() { - pullQueryQueue.close(); + try { + pullQueryQueue.close(); + } catch (final Throwable t) { + LOG.error("Error closing pull query queue", t); + } + future.complete(null); } public void onException(final Consumer consumer) { diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/QueryEndpoint.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/QueryEndpoint.java index 9a9ff8e4ea08..3b195d33c933 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/QueryEndpoint.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/QueryEndpoint.java @@ -296,11 +296,13 @@ private QueryPublisher createTablePullQueryPublisher( statement.getSessionConfig().getOverrides() ); - PullQueryExecutionUtil.checkRateLimit(rateLimiter); - final Decrementer decrementer = pullConcurrencyLimiter.increment(); - pullBandRateLimiter.allow(KsqlQueryType.PULL); - + Decrementer decrementer = null; try { + PullQueryExecutionUtil.checkRateLimit(rateLimiter); + decrementer = pullConcurrencyLimiter.increment(); + pullBandRateLimiter.allow(KsqlQueryType.PULL); + final Decrementer finalDecrementer = decrementer; + final PullQueryResult result = ksqlEngine.executeTablePullQuery( analysis, serviceContext, @@ -314,7 +316,7 @@ private QueryPublisher createTablePullQueryPublisher( resultForMetrics.set(result); result.onCompletionOrException((v, throwable) -> { - decrementer.decrementAtMostOnce(); + finalDecrementer.decrementAtMostOnce(); }); final BlockingQueryPublisher publisher = new BlockingQueryPublisher(context, workerExecutor); @@ -323,7 +325,9 @@ private QueryPublisher createTablePullQueryPublisher( return publisher; } catch (Throwable t) { - decrementer.decrementAtMostOnce(); + if (decrementer != null) { + decrementer.decrementAtMostOnce(); + } throw t; } } diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisher.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisher.java index f9b2f49bcf1c..b50a47c03fd5 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisher.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisher.java @@ -108,12 +108,14 @@ public synchronized void subscribe(final Subscriber> sub query.getSessionConfig().getOverrides() ); - PullQueryExecutionUtil.checkRateLimit(rateLimiter); - final Decrementer decrementer = concurrencyLimiter.increment(); - pullBandRateLimiter.allow(KsqlQueryType.PULL); - PullQueryResult result = null; + Decrementer decrementer = null; try { + PullQueryExecutionUtil.checkRateLimit(rateLimiter); + decrementer = concurrencyLimiter.increment(); + pullBandRateLimiter.allow(KsqlQueryType.PULL); + final Decrementer finalDecrementer = decrementer; + result = ksqlEngine.executeTablePullQuery( analysis, serviceContext, @@ -127,7 +129,7 @@ public synchronized void subscribe(final Subscriber> sub final PullQueryResult finalResult = result; result.onCompletionOrException((v, throwable) -> { - decrementer.decrementAtMostOnce(); + finalDecrementer.decrementAtMostOnce(); pullQueryMetrics.ifPresent(m -> { recordMetrics(m, finalResult); @@ -139,7 +141,9 @@ public synchronized void subscribe(final Subscriber> sub subscriber.onSubscribe(subscription); } catch (Throwable t) { - decrementer.decrementAtMostOnce(); + if (decrementer != null) { + decrementer.decrementAtMostOnce(); + } if (result == null) { pullQueryMetrics.ifPresent(this::recordErrorMetrics); diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java index 021111fb83d5..35bb428bd468 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java @@ -315,10 +315,13 @@ private EndpointResponse handleStatement( statement.getClass().getName())); } } catch (final TopicAuthorizationException e) { + log.error("Cannot access Kafka topic", e); return errorHandler.accessDeniedFromKafkaResponse(e); } catch (final KsqlStatementException e) { + log.error("Cannot execute statement", e); return Errors.badStatement(e.getRawMessage(), e.getSqlStatement()); } catch (final KsqlException e) { + log.error("Error executing pull query", e); return errorHandler.generateResponse(e, Errors.badRequest(e)); } } @@ -491,15 +494,14 @@ private EndpointResponse handleTablePullQuery( // Only check the rate limit at the forwarding host Decrementer decrementer = null; - if (!isAlreadyForwarded) { - PullQueryExecutionUtil.checkRateLimit(rateLimiter); - decrementer = concurrencyLimiter.increment(); - } - pullBandRateLimiter.allow(KsqlQueryType.PULL); - - final Optional optionalDecrementer = Optional.ofNullable(decrementer); - try { + if (!isAlreadyForwarded) { + PullQueryExecutionUtil.checkRateLimit(rateLimiter); + decrementer = concurrencyLimiter.increment(); + } + pullBandRateLimiter.allow(KsqlQueryType.PULL); + + final Optional optionalDecrementer = Optional.ofNullable(decrementer); final PullQueryResult result = ksqlEngine.executeTablePullQuery( analysis, serviceContext, @@ -525,7 +527,9 @@ private EndpointResponse handleTablePullQuery( return EndpointResponse.ok(pullQueryStreamWriter); } catch (final Throwable t) { - optionalDecrementer.ifPresent(Decrementer::decrementAtMostOnce); + if (decrementer != null) { + decrementer.decrementAtMostOnce(); + } throw t; } } @@ -659,14 +663,6 @@ private EndpointResponse handlePushQuery( return EndpointResponse.ok(queryStreamWriter); } - private static String writeValueAsString(final Object object) { - try { - return OBJECT_MAPPER.writeValueAsString(object); - } catch (final JsonProcessingException e) { - throw new RuntimeException(e); - } - } - private EndpointResponse handlePrintTopic( final ServiceContext serviceContext, final Map streamProperties,