Skip to content

Commit

Permalink
rohan's review
Browse files Browse the repository at this point in the history
  • Loading branch information
lct45 committed Mar 1, 2022
1 parent 3aedfb1 commit 181d762
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -397,10 +397,10 @@ public class KsqlRestConfig extends AbstractConfig {
+ "ksqlDB servers have mutual TLS enabled)";

public static final String KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG =
KSQL_CONFIG_PREFIX + "server.command.topic.rate.limit";
KSQL_CONFIG_PREFIX + "server.command.topic.rate.limit";
public static final double KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG_DEFAULT = Double.MAX_VALUE;
private static final String KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG_DEFAULT_DOC
= "Sets the number of statements that can be executed against the command topic per second";
private static final String KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG_DEFAULT_DOC =
"Sets the number of statements that can be executed against the command topic per second";

private static final ConfigDef CONFIG_DEF;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import org.apache.kafka.clients.producer.Producer;
Expand All @@ -63,7 +64,9 @@
* duration for the command to be executed remotely if configured with a
* {@code distributedCmdResponseTimeout}.
*/
// CHECKSTYLE_RULES.OFF: ClassDataAbstractionCoupling
public class DistributingExecutor {
// CHECKSTYLE_RULES.ON: ClassDataAbstractionCoupling
private final CommandQueue commandQueue;
private final Duration distributedCmdResponseTimeout;
private final BiFunction<KsqlExecutionContext, ServiceContext, Injector> injectorFactory;
Expand Down Expand Up @@ -203,10 +206,10 @@ public StatementExecutorResponse execute(
statement.getStatementText()), e);
}

if (!rateLimiter.tryAcquire()) {
if (!rateLimiter.tryAcquire(1, TimeUnit.SECONDS)) {
throw new KsqlRestException(
Errors.tooManyRequests(
"Too many writes to the command topic within a 1 second timeframe"
"DDL/DML rate is crossing the configured rate limit of statements/second"
));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ CommonCreateConfigs.VALUE_FORMAT_PROPERTY, new StringLiteral("json")
public void shouldThrowIfRateLimitHit() {
// Given:
final DistributingExecutor rateLimitedDistributor = new DistributingExecutor(
new KsqlConfig(ImmutableMap.of(KsqlRestConfig.KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG, 1.0)),
new KsqlConfig(ImmutableMap.of(KsqlRestConfig.KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG, 0.5)),
queue,
DURATION_10_MS,
(ec, sc) -> InjectorChain.of(schemaInjector, topicInjector),
Expand All @@ -528,6 +528,6 @@ public void shouldThrowIfRateLimitHit() {

assertEquals(e.getResponse().getStatus(), 429);
final KsqlErrorMessage errorMessage = (KsqlErrorMessage) e.getResponse().getEntity();
assertTrue(errorMessage.getMessage().contains("Too many writes to the command topic within a 1 second timeframe"));
assertTrue(errorMessage.getMessage().contains("DDL/DML rate is crossing the configured rate limit of statements/second"));
}
}

0 comments on commit 181d762

Please sign in to comment.