From 181d7623a0bf6e5ef644f51ccf294eb3a8360f18 Mon Sep 17 00:00:00 2001 From: Leah Thomas Date: Tue, 1 Mar 2022 11:36:53 -0600 Subject: [PATCH] rohan's review --- .../java/io/confluent/ksql/rest/server/KsqlRestConfig.java | 6 +++--- .../ksql/rest/server/computation/DistributingExecutor.java | 7 +++++-- .../rest/server/computation/DistributingExecutorTest.java | 4 ++-- 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java index 6b9758e46593..55c60f354cca 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java @@ -397,10 +397,10 @@ public class KsqlRestConfig extends AbstractConfig { + "ksqlDB servers have mutual TLS enabled)"; public static final String KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG = - KSQL_CONFIG_PREFIX + "server.command.topic.rate.limit"; + KSQL_CONFIG_PREFIX + "server.command.topic.rate.limit"; public static final double KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG_DEFAULT = Double.MAX_VALUE; - private static final String KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG_DEFAULT_DOC - = "Sets the number of statements that can be executed against the command topic per second"; + private static final String KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG_DEFAULT_DOC = + "Sets the number of statements that can be executed against the command topic per second"; private static final ConfigDef CONFIG_DEF; diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/DistributingExecutor.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/DistributingExecutor.java index 614d528e1228..3ba1665bbbda 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/DistributingExecutor.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/DistributingExecutor.java @@ -49,6 +49,7 @@ import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.concurrent.TimeUnit; import java.util.function.BiFunction; import java.util.function.Supplier; import org.apache.kafka.clients.producer.Producer; @@ -63,7 +64,9 @@ * duration for the command to be executed remotely if configured with a * {@code distributedCmdResponseTimeout}. */ +// CHECKSTYLE_RULES.OFF: ClassDataAbstractionCoupling public class DistributingExecutor { + // CHECKSTYLE_RULES.ON: ClassDataAbstractionCoupling private final CommandQueue commandQueue; private final Duration distributedCmdResponseTimeout; private final BiFunction injectorFactory; @@ -203,10 +206,10 @@ public StatementExecutorResponse execute( statement.getStatementText()), e); } - if (!rateLimiter.tryAcquire()) { + if (!rateLimiter.tryAcquire(1, TimeUnit.SECONDS)) { throw new KsqlRestException( Errors.tooManyRequests( - "Too many writes to the command topic within a 1 second timeframe" + "DDL/DML rate is crossing the configured rate limit of statements/second" )); } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/DistributingExecutorTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/DistributingExecutorTest.java index 1b1b0d7c6275..931cb34f81bb 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/DistributingExecutorTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/DistributingExecutorTest.java @@ -506,7 +506,7 @@ CommonCreateConfigs.VALUE_FORMAT_PROPERTY, new StringLiteral("json") public void shouldThrowIfRateLimitHit() { // Given: final DistributingExecutor rateLimitedDistributor = new DistributingExecutor( - new KsqlConfig(ImmutableMap.of(KsqlRestConfig.KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG, 1.0)), + new KsqlConfig(ImmutableMap.of(KsqlRestConfig.KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG, 0.5)), queue, DURATION_10_MS, (ec, sc) -> InjectorChain.of(schemaInjector, topicInjector), @@ -528,6 +528,6 @@ public void shouldThrowIfRateLimitHit() { assertEquals(e.getResponse().getStatus(), 429); final KsqlErrorMessage errorMessage = (KsqlErrorMessage) e.getResponse().getEntity(); - assertTrue(errorMessage.getMessage().contains("Too many writes to the command topic within a 1 second timeframe")); + assertTrue(errorMessage.getMessage().contains("DDL/DML rate is crossing the configured rate limit of statements/second")); } }