Skip to content

Commit

Permalink
fix: Always dec. concurrency counter, even hitting rate limits
Browse files Browse the repository at this point in the history
  • Loading branch information
AlanConfluent committed Sep 23, 2021
1 parent 5686515 commit 704b30b
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,11 @@
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PullQueryResult {
private static final Logger LOG = LoggerFactory.getLogger(PullQueryResult.class);

private final LogicalSchema schema;
private final PullQueryQueuePopulator populator;
Expand Down Expand Up @@ -87,16 +90,26 @@ public PullQueryQueue getPullQueryQueue() {
public void start() {
Preconditions.checkState(!started, "Should only start once");
started = true;
final CompletableFuture<Void> f = populator.run();
f.exceptionally(t -> {
try {
final CompletableFuture<Void> f = populator.run();
f.exceptionally(t -> {
future.completeExceptionally(t);
return null;
});
f.thenAccept(future::complete);
} catch (final Throwable t) {
future.completeExceptionally(t);
return null;
});
f.thenAccept(future::complete);
throw t;
}
}

public void stop() {
pullQueryQueue.close();
try {
pullQueryQueue.close();
} catch (final Throwable t) {
LOG.error("Error closing pull query queue", t);
}
future.complete(null);
}

public void onException(final Consumer<Throwable> consumer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,11 +296,13 @@ private QueryPublisher createTablePullQueryPublisher(
statement.getSessionConfig().getOverrides()
);

PullQueryExecutionUtil.checkRateLimit(rateLimiter);
final Decrementer decrementer = pullConcurrencyLimiter.increment();
pullBandRateLimiter.allow(KsqlQueryType.PULL);

Decrementer decrementer = null;
try {
PullQueryExecutionUtil.checkRateLimit(rateLimiter);
decrementer = pullConcurrencyLimiter.increment();
pullBandRateLimiter.allow(KsqlQueryType.PULL);
final Decrementer finalDecrementer = decrementer;

final PullQueryResult result = ksqlEngine.executeTablePullQuery(
analysis,
serviceContext,
Expand All @@ -314,7 +316,7 @@ private QueryPublisher createTablePullQueryPublisher(

resultForMetrics.set(result);
result.onCompletionOrException((v, throwable) -> {
decrementer.decrementAtMostOnce();
finalDecrementer.decrementAtMostOnce();
});

final BlockingQueryPublisher publisher = new BlockingQueryPublisher(context, workerExecutor);
Expand All @@ -323,7 +325,9 @@ private QueryPublisher createTablePullQueryPublisher(

return publisher;
} catch (Throwable t) {
decrementer.decrementAtMostOnce();
if (decrementer != null) {
decrementer.decrementAtMostOnce();
}
throw t;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,14 @@ public synchronized void subscribe(final Subscriber<Collection<StreamedRow>> sub
query.getSessionConfig().getOverrides()
);

PullQueryExecutionUtil.checkRateLimit(rateLimiter);
final Decrementer decrementer = concurrencyLimiter.increment();
pullBandRateLimiter.allow(KsqlQueryType.PULL);

PullQueryResult result = null;
Decrementer decrementer = null;
try {
PullQueryExecutionUtil.checkRateLimit(rateLimiter);
decrementer = concurrencyLimiter.increment();
pullBandRateLimiter.allow(KsqlQueryType.PULL);
final Decrementer finalDecrementer = decrementer;

result = ksqlEngine.executeTablePullQuery(
analysis,
serviceContext,
Expand All @@ -127,7 +129,7 @@ public synchronized void subscribe(final Subscriber<Collection<StreamedRow>> sub

final PullQueryResult finalResult = result;
result.onCompletionOrException((v, throwable) -> {
decrementer.decrementAtMostOnce();
finalDecrementer.decrementAtMostOnce();

pullQueryMetrics.ifPresent(m -> {
recordMetrics(m, finalResult);
Expand All @@ -139,7 +141,9 @@ public synchronized void subscribe(final Subscriber<Collection<StreamedRow>> sub

subscriber.onSubscribe(subscription);
} catch (Throwable t) {
decrementer.decrementAtMostOnce();
if (decrementer != null) {
decrementer.decrementAtMostOnce();
}

if (result == null) {
pullQueryMetrics.ifPresent(this::recordErrorMetrics);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,10 +315,13 @@ private EndpointResponse handleStatement(
statement.getClass().getName()));
}
} catch (final TopicAuthorizationException e) {
log.error("Cannot access Kafka topic", e);
return errorHandler.accessDeniedFromKafkaResponse(e);
} catch (final KsqlStatementException e) {
log.error("Cannot execute statement", e);
return Errors.badStatement(e.getRawMessage(), e.getSqlStatement());
} catch (final KsqlException e) {
log.error("Error executing pull query", e);
return errorHandler.generateResponse(e, Errors.badRequest(e));
}
}
Expand Down Expand Up @@ -491,15 +494,14 @@ private EndpointResponse handleTablePullQuery(

// Only check the rate limit at the forwarding host
Decrementer decrementer = null;
if (!isAlreadyForwarded) {
PullQueryExecutionUtil.checkRateLimit(rateLimiter);
decrementer = concurrencyLimiter.increment();
}
pullBandRateLimiter.allow(KsqlQueryType.PULL);

final Optional<Decrementer> optionalDecrementer = Optional.ofNullable(decrementer);

try {
if (!isAlreadyForwarded) {
PullQueryExecutionUtil.checkRateLimit(rateLimiter);
decrementer = concurrencyLimiter.increment();
}
pullBandRateLimiter.allow(KsqlQueryType.PULL);

final Optional<Decrementer> optionalDecrementer = Optional.ofNullable(decrementer);
final PullQueryResult result = ksqlEngine.executeTablePullQuery(
analysis,
serviceContext,
Expand All @@ -525,7 +527,9 @@ private EndpointResponse handleTablePullQuery(

return EndpointResponse.ok(pullQueryStreamWriter);
} catch (final Throwable t) {
optionalDecrementer.ifPresent(Decrementer::decrementAtMostOnce);
if (decrementer != null) {
decrementer.decrementAtMostOnce();
}
throw t;
}
}
Expand Down Expand Up @@ -659,14 +663,6 @@ private EndpointResponse handlePushQuery(
return EndpointResponse.ok(queryStreamWriter);
}

private static String writeValueAsString(final Object object) {
try {
return OBJECT_MAPPER.writeValueAsString(object);
} catch (final JsonProcessingException e) {
throw new RuntimeException(e);
}
}

private EndpointResponse handlePrintTopic(
final ServiceContext serviceContext,
final Map<String, Object> streamProperties,
Expand Down

0 comments on commit 704b30b

Please sign in to comment.