From b26d7b9763588c744ef4d48d33798b9819c33bfd Mon Sep 17 00:00:00 2001 From: Leah Thomas Date: Thu, 24 Feb 2022 10:42:12 -0600 Subject: [PATCH 1/8] feat: add rate-limiting to ksql command topic --- .../confluent/ksql/rest/server/computation/CommandRunner.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java index 855fab434a2b..054ab3026392 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java @@ -16,6 +16,7 @@ package io.confluent.ksql.rest.server.computation; import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.RateLimiter; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.confluent.ksql.rest.Errors; import io.confluent.ksql.rest.entity.ClusterTerminateRequest; @@ -90,6 +91,7 @@ public class CommandRunner implements Closeable { private final Supplier commandTopicExists; private boolean commandTopicDeleted; private Status state = new Status(CommandRunnerStatus.RUNNING, CommandRunnerDegradedReason.NONE); + private RateLimiter rateLimiter; public enum CommandRunnerStatus { RUNNING, @@ -218,6 +220,7 @@ public CommandRunner( Objects.requireNonNull(commandTopicExists, "commandTopicExists"); this.incompatibleCommandDetected = false; this.commandTopicDeleted = false; + this.rateLimiter = RateLimiter.create(500.0); } /** @@ -346,6 +349,7 @@ private void executeStatement(final QueuedCommand queuedCommand) { if (closed) { LOG.info("Execution aborted as system is closing down"); } else { + rateLimiter.acquire(); statementExecutor.handleStatement(queuedCommand); LOG.info("Executed statement: " + commandStatement); } From fba9fe4dbdaf655ecc2d7fedc86f37288cb07b6e Mon Sep 17 00:00:00 2001 From: Leah Thomas Date: Thu, 24 Feb 2022 11:17:28 -0600 Subject: [PATCH 2/8] make rate limit quota set by config --- .../confluent/ksql/config/ImmutableProperties.java | 1 + .../java/io/confluent/ksql/util/KsqlConfig.java | 13 +++++++++++++ .../ksql/rest/server/KsqlRestApplication.java | 3 ++- .../ksql/rest/server/computation/CommandRunner.java | 11 +++++++---- .../rest/server/computation/CommandRunnerTest.java | 4 +++- .../ksql/rest/server/computation/RecoveryTest.java | 3 ++- 6 files changed, 28 insertions(+), 7 deletions(-) diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/config/ImmutableProperties.java b/ksqldb-common/src/main/java/io/confluent/ksql/config/ImmutableProperties.java index 3022f2004194..2e0c69a02c21 100644 --- a/ksqldb-common/src/main/java/io/confluent/ksql/config/ImmutableProperties.java +++ b/ksqldb-common/src/main/java/io/confluent/ksql/config/ImmutableProperties.java @@ -36,6 +36,7 @@ public final class ImmutableProperties { .add(KsqlConfig.KSQL_SOURCE_TABLE_MATERIALIZATION_ENABLED) .add(KsqlConfig.KSQL_HEADERS_COLUMNS_ENABLED) .addAll(KsqlConfig.SSL_CONFIG_NAMES) + .add(KsqlConfig.KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG) .build(); private ImmutableProperties() { diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java b/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java index 4cfaa3fb2cd6..fa95388af55b 100644 --- a/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java +++ b/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java @@ -663,6 +663,12 @@ public class KsqlConfig extends AbstractConfig { private static final String KSQL_ENDPOINT_MIGRATE_QUERY_DOC = "Migrates the /query endpoint to use the same handler as /query-stream."; + public static final String KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG + = "ksql.command.topic.rate.limit"; + public static final double KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG_DEFAULT = 10.0; + private static final String KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG_DEFAULT_DOC + = "Sets the number of statements that can be executed against the command topic per second"; + private enum ConfigGeneration { LEGACY, CURRENT @@ -1429,6 +1435,13 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) { Importance.LOW, KSQL_TRANSIENT_QUERY_CLEANUP_SERVICE_PERIOD_SECONDS_DOC ) + .define( + KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG, + Type.DOUBLE, + KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG_DEFAULT, + Importance.LOW, + KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG_DEFAULT_DOC + ) .withClientSslSupport(); for (final CompatibilityBreakingConfigDef compatibilityBreakingConfigDef diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java index f02d933352a4..0332e7b4a642 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java @@ -932,7 +932,8 @@ static KsqlRestApplication buildApplication( errorHandler, internalTopicClient, commandTopicName, - metricCollectors.getMetrics() + metricCollectors.getMetrics(), + ksqlConfig.getDouble("ksql.command.topic.rate.limit") ); final KsqlResource ksqlResource = new KsqlResource( diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java index 054ab3026392..a2a9bc8ce30e 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java @@ -150,7 +150,8 @@ public CommandRunner( final Errors errorHandler, final KafkaTopicClient kafkaTopicClient, final String commandTopicName, - final Metrics metrics + final Metrics metrics, + final double rateLimit ) { this( statementExecutor, @@ -171,7 +172,8 @@ public CommandRunner( commandDeserializer, errorHandler, () -> kafkaTopicClient.isTopicExists(commandTopicName), - metrics + metrics, + rateLimit ); } @@ -193,7 +195,8 @@ public CommandRunner( final Deserializer commandDeserializer, final Errors errorHandler, final Supplier commandTopicExists, - final Metrics metrics + final Metrics metrics, + final double rateLimit ) { // CHECKSTYLE_RULES.ON: ParameterNumberCheck this.statementExecutor = Objects.requireNonNull(statementExecutor, "statementExecutor"); @@ -220,7 +223,7 @@ public CommandRunner( Objects.requireNonNull(commandTopicExists, "commandTopicExists"); this.incompatibleCommandDetected = false; this.commandTopicDeleted = false; - this.rateLimiter = RateLimiter.create(500.0); + this.rateLimiter = RateLimiter.create(rateLimit); } /** diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java index 755ec661d234..63467bac3cd8 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java @@ -46,6 +46,7 @@ import io.confluent.ksql.rest.util.ClusterTerminator; import io.confluent.ksql.rest.util.PersistentQueryCleanupImpl; import io.confluent.ksql.rest.util.TerminateCluster; +import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.PersistentQueryMetadata; import java.time.Clock; import java.time.Duration; @@ -166,7 +167,8 @@ public void setup() { commandDeserializer, errorHandler, commandTopicExists, - new Metrics() + new Metrics(), + KsqlConfig.KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG_DEFAULT ); } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java index 0b106b682880..b261039d3974 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java @@ -246,7 +246,8 @@ private class KsqlServer { errorHandler, topicClient, "command_topic", - new Metrics() + new Metrics(), + KsqlConfig.KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG_DEFAULT ); this.ksqlResource = new KsqlResource( From 67b1011c6b74aaac4bff8d2e6471b95b6521e5ba Mon Sep 17 00:00:00 2001 From: Leah Thomas Date: Fri, 25 Feb 2022 09:40:05 -0600 Subject: [PATCH 3/8] make rate limiting return an error --- .../ksql/rest/server/computation/CommandRunner.java | 8 +++++++- .../src/main/java/io/confluent/ksql/rest/Errors.java | 10 ++++++++++ 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java index a2a9bc8ce30e..806a31798d17 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java @@ -21,11 +21,13 @@ import io.confluent.ksql.rest.Errors; import io.confluent.ksql.rest.entity.ClusterTerminateRequest; import io.confluent.ksql.rest.server.resources.IncompatibleKsqlCommandVersionException; +import io.confluent.ksql.rest.server.resources.KsqlRestException; import io.confluent.ksql.rest.server.state.ServerState; import io.confluent.ksql.rest.util.ClusterTerminator; import io.confluent.ksql.rest.util.PersistentQueryCleanupImpl; import io.confluent.ksql.rest.util.TerminateCluster; import io.confluent.ksql.services.KafkaTopicClient; +import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.Pair; import io.confluent.ksql.util.PersistentQueryMetadata; import io.confluent.ksql.util.QueryMetadata; @@ -352,7 +354,11 @@ private void executeStatement(final QueuedCommand queuedCommand) { if (closed) { LOG.info("Execution aborted as system is closing down"); } else { - rateLimiter.acquire(); + if (!rateLimiter.tryAcquire()) { + throw new KsqlRestException( + Errors.tooManyRequests("Too many requests to the command topic within a 1 second timeframe") + ); + } statementExecutor.handleStatement(queuedCommand); LOG.info("Executed statement: " + commandStatement); } diff --git a/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/Errors.java b/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/Errors.java index 37e88421f9e9..89191e011d34 100644 --- a/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/Errors.java +++ b/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/Errors.java @@ -23,6 +23,7 @@ import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND; import static io.netty.handler.codec.http.HttpResponseStatus.PRECONDITION_REQUIRED; import static io.netty.handler.codec.http.HttpResponseStatus.SERVICE_UNAVAILABLE; +import static io.netty.handler.codec.http.HttpResponseStatus.TOO_MANY_REQUESTS; import static io.netty.handler.codec.http.HttpResponseStatus.UNAUTHORIZED; import io.confluent.ksql.rest.entity.KsqlEntityList; @@ -72,6 +73,8 @@ public final class Errors { public static final int ERROR_CODE_SERVER_ERROR = toErrorCode(INTERNAL_SERVER_ERROR.code()); + public static final int ERROR_CODE_TOO_MANY_REQUESTS = toErrorCode(TOO_MANY_REQUESTS.code()); + private final ErrorMessages errorMessages; public static int toStatusCode(final int errorCode) { @@ -208,6 +211,13 @@ public static EndpointResponse serverNotReady(final KsqlErrorMessage error) { .build(); } + public static EndpointResponse tooManyRequests(final String msg) { + return EndpointResponse.create() + .status(TOO_MANY_REQUESTS.code()) + .entity(new KsqlErrorMessage(ERROR_CODE_TOO_MANY_REQUESTS, msg)) + .build(); + } + public Errors(final ErrorMessages errorMessages) { this.errorMessages = Objects.requireNonNull(errorMessages, "errorMessages"); } From 5b543be5f4a370f37323e9f5b6c0c56a201f0462 Mon Sep 17 00:00:00 2001 From: Leah Thomas Date: Fri, 25 Feb 2022 12:40:40 -0600 Subject: [PATCH 4/8] test + review --- .../ksql/rest/server/KsqlRestApplication.java | 2 +- .../server/computation/CommandRunner.java | 6 ++-- .../server/computation/CommandRunnerTest.java | 36 +++++++++++++++++++ 3 files changed, 40 insertions(+), 4 deletions(-) diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java index 0332e7b4a642..ff2d6730b452 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java @@ -933,7 +933,7 @@ static KsqlRestApplication buildApplication( internalTopicClient, commandTopicName, metricCollectors.getMetrics(), - ksqlConfig.getDouble("ksql.command.topic.rate.limit") + ksqlConfig.getDouble(KsqlConfig.KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG) ); final KsqlResource ksqlResource = new KsqlResource( diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java index 806a31798d17..a712159806cf 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java @@ -27,7 +27,6 @@ import io.confluent.ksql.rest.util.PersistentQueryCleanupImpl; import io.confluent.ksql.rest.util.TerminateCluster; import io.confluent.ksql.services.KafkaTopicClient; -import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.Pair; import io.confluent.ksql.util.PersistentQueryMetadata; import io.confluent.ksql.util.QueryMetadata; @@ -356,8 +355,9 @@ private void executeStatement(final QueuedCommand queuedCommand) { } else { if (!rateLimiter.tryAcquire()) { throw new KsqlRestException( - Errors.tooManyRequests("Too many requests to the command topic within a 1 second timeframe") - ); + Errors.tooManyRequests( + "Too many requests to the command topic within a 1 second timeframe" + )); } statementExecutor.handleStatement(queuedCommand); LOG.info("Executed statement: " + commandStatement); diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java index 63467bac3cd8..ad9f770be849 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java @@ -17,8 +17,10 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; @@ -41,7 +43,9 @@ import io.confluent.ksql.engine.KsqlEngine; import io.confluent.ksql.metrics.MetricCollectors; import io.confluent.ksql.rest.Errors; +import io.confluent.ksql.rest.entity.KsqlErrorMessage; import io.confluent.ksql.rest.server.resources.IncompatibleKsqlCommandVersionException; +import io.confluent.ksql.rest.server.resources.KsqlRestException; import io.confluent.ksql.rest.server.state.ServerState; import io.confluent.ksql.rest.util.ClusterTerminator; import io.confluent.ksql.rest.util.PersistentQueryCleanupImpl; @@ -653,6 +657,38 @@ public void shouldCloseTheCommandRunnerCorrectly() throws Exception { verify(commandStore).close(); } + + @Test + public void shouldThrowWhenRateLimitHit() { + // Given: + final CommandRunner rateLimitedCommandRunner = new CommandRunner( + statementExecutor, + commandStore, + 3, + clusterTerminator, + executor, + serverState, + "ksql-service-id", + Duration.ofMillis(COMMAND_RUNNER_HEALTH_TIMEOUT), + "", + clock, + compactor, + incompatibleCommandChecker, + commandDeserializer, + errorHandler, + commandTopicExists, + new Metrics(), + 1.0 + ); + givenQueuedCommands(queuedCommand1, queuedCommand2, queuedCommand3); + + // When: + // Then: + final KsqlRestException e = assertThrows(KsqlRestException.class, rateLimitedCommandRunner::fetchAndRunCommands); + assertEquals(e.getResponse().getStatus(), 429); + final KsqlErrorMessage errorMessage = (KsqlErrorMessage) e.getResponse().getEntity(); + assertTrue(errorMessage.getMessage().contains("Too many requests to the command topic within a 1 second timeframe")); + } private Runnable getThreadTask() { verify(executor).execute(threadTaskCaptor.capture()); From 6d168be4ccce8123aac203ec7921f0d0bde68812 Mon Sep 17 00:00:00 2001 From: Leah Thomas Date: Mon, 28 Feb 2022 09:30:15 -0600 Subject: [PATCH 5/8] move rate limiting to commandstore --- .../ksql/rest/server/KsqlRestApplication.java | 3 +- .../server/computation/CommandRunner.java | 19 ++------ .../rest/server/computation/CommandStore.java | 22 +++++++++- .../server/computation/CommandRunnerTest.java | 44 +------------------ .../server/computation/CommandStoreTest.java | 39 +++++++++++++++- .../rest/server/computation/RecoveryTest.java | 3 +- 6 files changed, 64 insertions(+), 66 deletions(-) diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java index ff2d6730b452..f02d933352a4 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java @@ -932,8 +932,7 @@ static KsqlRestApplication buildApplication( errorHandler, internalTopicClient, commandTopicName, - metricCollectors.getMetrics(), - ksqlConfig.getDouble(KsqlConfig.KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG) + metricCollectors.getMetrics() ); final KsqlResource ksqlResource = new KsqlResource( diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java index a712159806cf..855fab434a2b 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java @@ -16,12 +16,10 @@ package io.confluent.ksql.rest.server.computation; import com.google.common.annotations.VisibleForTesting; -import com.google.common.util.concurrent.RateLimiter; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.confluent.ksql.rest.Errors; import io.confluent.ksql.rest.entity.ClusterTerminateRequest; import io.confluent.ksql.rest.server.resources.IncompatibleKsqlCommandVersionException; -import io.confluent.ksql.rest.server.resources.KsqlRestException; import io.confluent.ksql.rest.server.state.ServerState; import io.confluent.ksql.rest.util.ClusterTerminator; import io.confluent.ksql.rest.util.PersistentQueryCleanupImpl; @@ -92,7 +90,6 @@ public class CommandRunner implements Closeable { private final Supplier commandTopicExists; private boolean commandTopicDeleted; private Status state = new Status(CommandRunnerStatus.RUNNING, CommandRunnerDegradedReason.NONE); - private RateLimiter rateLimiter; public enum CommandRunnerStatus { RUNNING, @@ -151,8 +148,7 @@ public CommandRunner( final Errors errorHandler, final KafkaTopicClient kafkaTopicClient, final String commandTopicName, - final Metrics metrics, - final double rateLimit + final Metrics metrics ) { this( statementExecutor, @@ -173,8 +169,7 @@ public CommandRunner( commandDeserializer, errorHandler, () -> kafkaTopicClient.isTopicExists(commandTopicName), - metrics, - rateLimit + metrics ); } @@ -196,8 +191,7 @@ public CommandRunner( final Deserializer commandDeserializer, final Errors errorHandler, final Supplier commandTopicExists, - final Metrics metrics, - final double rateLimit + final Metrics metrics ) { // CHECKSTYLE_RULES.ON: ParameterNumberCheck this.statementExecutor = Objects.requireNonNull(statementExecutor, "statementExecutor"); @@ -224,7 +218,6 @@ public CommandRunner( Objects.requireNonNull(commandTopicExists, "commandTopicExists"); this.incompatibleCommandDetected = false; this.commandTopicDeleted = false; - this.rateLimiter = RateLimiter.create(rateLimit); } /** @@ -353,12 +346,6 @@ private void executeStatement(final QueuedCommand queuedCommand) { if (closed) { LOG.info("Execution aborted as system is closing down"); } else { - if (!rateLimiter.tryAcquire()) { - throw new KsqlRestException( - Errors.tooManyRequests( - "Too many requests to the command topic within a 1 second timeframe" - )); - } statementExecutor.handleStatement(queuedCommand); LOG.info("Executed statement: " + commandStatement); } diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandStore.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandStore.java index f2265d48b972..488ca123482d 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandStore.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandStore.java @@ -17,11 +17,14 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.util.concurrent.RateLimiter; +import io.confluent.ksql.rest.Errors; import io.confluent.ksql.rest.entity.CommandId; import io.confluent.ksql.rest.server.CommandTopic; import io.confluent.ksql.rest.server.CommandTopicBackup; import io.confluent.ksql.rest.server.CommandTopicBackupImpl; import io.confluent.ksql.rest.server.CommandTopicBackupNoOp; +import io.confluent.ksql.rest.server.resources.KsqlRestException; import io.confluent.ksql.rest.util.CommandTopicBackupUtil; import io.confluent.ksql.services.KafkaTopicClient; import io.confluent.ksql.util.KsqlConfig; @@ -79,6 +82,7 @@ public class CommandStore implements CommandQueue, Closeable { private final Serializer commandSerializer; private final Deserializer commandIdDeserializer; private final CommandTopicBackup commandTopicBackup; + private final RateLimiter rateLimiter; public static final class Factory { @@ -118,6 +122,8 @@ public static CommandStore create( internalTopicClient ); } + final double rateLimit = + ksqlConfig.getDouble(KsqlConfig.KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG); return new CommandStore( commandTopicName, @@ -133,11 +139,13 @@ public static CommandStore create( InternalTopicSerdes.serializer(), InternalTopicSerdes.serializer(), InternalTopicSerdes.deserializer(CommandId.class), - commandTopicBackup + commandTopicBackup, + RateLimiter.create(rateLimit) ); } } + // CHECKSTYLE_RULES.OFF: ParameterNumberCheck CommandStore( final String commandTopicName, final CommandTopic commandTopic, @@ -148,8 +156,10 @@ public static CommandStore create( final Serializer commandIdSerializer, final Serializer commandSerializer, final Deserializer commandIdDeserializer, - final CommandTopicBackup commandTopicBackup + final CommandTopicBackup commandTopicBackup, + final RateLimiter rateLimiter ) { + // CHECKSTYLE_RULES.ON: ParameterNumberCheck this.commandTopic = Objects.requireNonNull(commandTopic, "commandTopic"); this.commandStatusMap = Maps.newConcurrentMap(); this.sequenceNumberFutureStore = @@ -169,6 +179,8 @@ public static CommandStore create( Objects.requireNonNull(commandIdDeserializer, "commandIdDeserializer"); this.commandTopicBackup = Objects.requireNonNull(commandTopicBackup, "commandTopicBackup"); + this.rateLimiter = + Objects.requireNonNull(rateLimiter, "rateLimiter"); } @Override @@ -198,6 +210,12 @@ public QueuedCommandStatus enqueueCommand( final Command command, final Producer transactionalProducer ) { + if (!rateLimiter.tryAcquire()) { + throw new KsqlRestException( + Errors.tooManyRequests( + "Too many writes to the command topic within a 1 second timeframe" + )); + } final CommandStatusFuture statusFuture = commandStatusMap.compute( commandId, (k, v) -> { diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java index ad9f770be849..3efe4f6837bc 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java @@ -17,13 +17,9 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThrows; -import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.anyLong; @@ -32,7 +28,6 @@ import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.inOrder; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -41,16 +36,12 @@ import com.google.common.collect.ImmutableList; import io.confluent.ksql.engine.KsqlEngine; -import io.confluent.ksql.metrics.MetricCollectors; import io.confluent.ksql.rest.Errors; -import io.confluent.ksql.rest.entity.KsqlErrorMessage; import io.confluent.ksql.rest.server.resources.IncompatibleKsqlCommandVersionException; -import io.confluent.ksql.rest.server.resources.KsqlRestException; import io.confluent.ksql.rest.server.state.ServerState; import io.confluent.ksql.rest.util.ClusterTerminator; import io.confluent.ksql.rest.util.PersistentQueryCleanupImpl; import io.confluent.ksql.rest.util.TerminateCluster; -import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.PersistentQueryMetadata; import java.time.Clock; import java.time.Duration; @@ -171,8 +162,7 @@ public void setup() { commandDeserializer, errorHandler, commandTopicExists, - new Metrics(), - KsqlConfig.KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG_DEFAULT + new Metrics() ); } @@ -657,38 +647,6 @@ public void shouldCloseTheCommandRunnerCorrectly() throws Exception { verify(commandStore).close(); } - - @Test - public void shouldThrowWhenRateLimitHit() { - // Given: - final CommandRunner rateLimitedCommandRunner = new CommandRunner( - statementExecutor, - commandStore, - 3, - clusterTerminator, - executor, - serverState, - "ksql-service-id", - Duration.ofMillis(COMMAND_RUNNER_HEALTH_TIMEOUT), - "", - clock, - compactor, - incompatibleCommandChecker, - commandDeserializer, - errorHandler, - commandTopicExists, - new Metrics(), - 1.0 - ); - givenQueuedCommands(queuedCommand1, queuedCommand2, queuedCommand3); - - // When: - // Then: - final KsqlRestException e = assertThrows(KsqlRestException.class, rateLimitedCommandRunner::fetchAndRunCommands); - assertEquals(e.getResponse().getStatus(), 429); - final KsqlErrorMessage errorMessage = (KsqlErrorMessage) e.getResponse().getEntity(); - assertTrue(errorMessage.getMessage().contains("Too many requests to the command topic within a 1 second timeframe")); - } private Runnable getThreadTask() { verify(executor).execute(threadTaskCaptor.capture()); diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandStoreTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandStoreTest.java index cb6aadbad69c..196ab0425a5b 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandStoreTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandStoreTest.java @@ -23,7 +23,9 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.core.IsEqual.equalTo; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; @@ -32,10 +34,14 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.google.common.util.concurrent.RateLimiter; import io.confluent.ksql.rest.entity.CommandId; import io.confluent.ksql.rest.entity.CommandStatus; +import io.confluent.ksql.rest.entity.KsqlErrorMessage; import io.confluent.ksql.rest.server.CommandTopic; import io.confluent.ksql.rest.server.CommandTopicBackup; +import io.confluent.ksql.rest.server.resources.KsqlRestException; +import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; import java.time.Duration; import java.util.ArrayList; @@ -152,7 +158,8 @@ public void setUp() { commandIdSerializer, commandSerializer, commandIdDeserializer, - commandTopicBackup + commandTopicBackup, + RateLimiter.create(KsqlConfig.KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG_DEFAULT) ); } @@ -379,6 +386,36 @@ public void shouldSuccessfullyAbortAndRetry() { commandStore.enqueueCommand(commandId, command, transactionalProducer); } + @Test + public void shouldFailEnqueueIfRateLimitHit() { + // Given: + final CommandStore lowRateLimitStore = new CommandStore( + COMMAND_TOPIC_NAME, + commandTopic, + sequenceNumberFutureStore, + Collections.emptyMap(), + Collections.emptyMap(), + TIMEOUT, + commandIdSerializer, + commandSerializer, + commandIdDeserializer, + commandTopicBackup, + RateLimiter.create(1.0) + ); + + // When: + lowRateLimitStore.enqueueCommand(commandId, command, transactionalProducer); + + // Then: + final KsqlRestException e = assertThrows( + KsqlRestException.class, + () -> lowRateLimitStore.enqueueCommand(commandId, command, transactionalProducer) + ); + assertEquals(e.getResponse().getStatus(), 429); + final KsqlErrorMessage errorMessage = (KsqlErrorMessage) e.getResponse().getEntity(); + assertTrue(errorMessage.getMessage().contains("Too many writes to the command topic within a 1 second timeframe")); + } + private static ConsumerRecords buildRecords(final Object... args) { assertThat(args.length % 2, equalTo(0)); final List> records = new ArrayList<>(); diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java index b261039d3974..0b106b682880 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java @@ -246,8 +246,7 @@ private class KsqlServer { errorHandler, topicClient, "command_topic", - new Metrics(), - KsqlConfig.KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG_DEFAULT + new Metrics() ); this.ksqlResource = new KsqlResource( From e2465baedd2867cd4d5b0105895d53d550f80190 Mon Sep 17 00:00:00 2001 From: Leah Thomas Date: Mon, 28 Feb 2022 14:59:10 -0600 Subject: [PATCH 6/8] move rate limiting to distributing executor --- .../rest/server/computation/CommandStore.java | 22 +---------- .../computation/DistributingExecutor.java | 12 ++++++ .../server/computation/CommandStoreTest.java | 39 +------------------ .../computation/DistributingExecutorTest.java | 33 ++++++++++++++++ 4 files changed, 48 insertions(+), 58 deletions(-) diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandStore.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandStore.java index 488ca123482d..f2265d48b972 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandStore.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandStore.java @@ -17,14 +17,11 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.common.util.concurrent.RateLimiter; -import io.confluent.ksql.rest.Errors; import io.confluent.ksql.rest.entity.CommandId; import io.confluent.ksql.rest.server.CommandTopic; import io.confluent.ksql.rest.server.CommandTopicBackup; import io.confluent.ksql.rest.server.CommandTopicBackupImpl; import io.confluent.ksql.rest.server.CommandTopicBackupNoOp; -import io.confluent.ksql.rest.server.resources.KsqlRestException; import io.confluent.ksql.rest.util.CommandTopicBackupUtil; import io.confluent.ksql.services.KafkaTopicClient; import io.confluent.ksql.util.KsqlConfig; @@ -82,7 +79,6 @@ public class CommandStore implements CommandQueue, Closeable { private final Serializer commandSerializer; private final Deserializer commandIdDeserializer; private final CommandTopicBackup commandTopicBackup; - private final RateLimiter rateLimiter; public static final class Factory { @@ -122,8 +118,6 @@ public static CommandStore create( internalTopicClient ); } - final double rateLimit = - ksqlConfig.getDouble(KsqlConfig.KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG); return new CommandStore( commandTopicName, @@ -139,13 +133,11 @@ public static CommandStore create( InternalTopicSerdes.serializer(), InternalTopicSerdes.serializer(), InternalTopicSerdes.deserializer(CommandId.class), - commandTopicBackup, - RateLimiter.create(rateLimit) + commandTopicBackup ); } } - // CHECKSTYLE_RULES.OFF: ParameterNumberCheck CommandStore( final String commandTopicName, final CommandTopic commandTopic, @@ -156,10 +148,8 @@ public static CommandStore create( final Serializer commandIdSerializer, final Serializer commandSerializer, final Deserializer commandIdDeserializer, - final CommandTopicBackup commandTopicBackup, - final RateLimiter rateLimiter + final CommandTopicBackup commandTopicBackup ) { - // CHECKSTYLE_RULES.ON: ParameterNumberCheck this.commandTopic = Objects.requireNonNull(commandTopic, "commandTopic"); this.commandStatusMap = Maps.newConcurrentMap(); this.sequenceNumberFutureStore = @@ -179,8 +169,6 @@ public static CommandStore create( Objects.requireNonNull(commandIdDeserializer, "commandIdDeserializer"); this.commandTopicBackup = Objects.requireNonNull(commandTopicBackup, "commandTopicBackup"); - this.rateLimiter = - Objects.requireNonNull(rateLimiter, "rateLimiter"); } @Override @@ -210,12 +198,6 @@ public QueuedCommandStatus enqueueCommand( final Command command, final Producer transactionalProducer ) { - if (!rateLimiter.tryAcquire()) { - throw new KsqlRestException( - Errors.tooManyRequests( - "Too many writes to the command topic within a 1 second timeframe" - )); - } final CommandStatusFuture statusFuture = commandStatusMap.compute( commandId, (k, v) -> { diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/DistributingExecutor.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/DistributingExecutor.java index 428b43138f73..d91ab4a90ed9 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/DistributingExecutor.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/DistributingExecutor.java @@ -14,6 +14,7 @@ package io.confluent.ksql.rest.server.computation; +import com.google.common.util.concurrent.RateLimiter; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.confluent.ksql.KsqlExecutionContext; import io.confluent.ksql.metastore.MetaStore; @@ -32,6 +33,7 @@ import io.confluent.ksql.rest.entity.KsqlWarning; import io.confluent.ksql.rest.entity.WarningEntity; import io.confluent.ksql.rest.server.execution.StatementExecutorResponse; +import io.confluent.ksql.rest.server.resources.KsqlRestException; import io.confluent.ksql.security.KsqlAuthorizationValidator; import io.confluent.ksql.security.KsqlSecurityContext; import io.confluent.ksql.services.ServiceContext; @@ -70,6 +72,7 @@ public class DistributingExecutor { private final ReservedInternalTopics internalTopics; private final Errors errorHandler; private final Supplier commandRunnerWarning; + private final RateLimiter rateLimiter; @SuppressFBWarnings(value = "EI_EXPOSE_REP2") public DistributingExecutor( @@ -98,6 +101,8 @@ public DistributingExecutor( this.errorHandler = Objects.requireNonNull(errorHandler, "errorHandler"); this.commandRunnerWarning = Objects.requireNonNull(commandRunnerWarning, "commandRunnerWarning"); + this.rateLimiter = + RateLimiter.create(ksqlConfig.getDouble(KsqlConfig.KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG)); } // CHECKSTYLE_RULES.OFF: CyclomaticComplexity @@ -196,6 +201,13 @@ public StatementExecutorResponse execute( statement.getStatementText()), e); } + if (!rateLimiter.tryAcquire()) { + throw new KsqlRestException( + Errors.tooManyRequests( + "Too many writes to the command topic within a 1 second timeframe" + )); + } + CommandId commandId = null; try { transactionalProducer.beginTransaction(); diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandStoreTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandStoreTest.java index 196ab0425a5b..cb6aadbad69c 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandStoreTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandStoreTest.java @@ -23,9 +23,7 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.core.IsEqual.equalTo; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThrows; -import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; @@ -34,14 +32,10 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import com.google.common.util.concurrent.RateLimiter; import io.confluent.ksql.rest.entity.CommandId; import io.confluent.ksql.rest.entity.CommandStatus; -import io.confluent.ksql.rest.entity.KsqlErrorMessage; import io.confluent.ksql.rest.server.CommandTopic; import io.confluent.ksql.rest.server.CommandTopicBackup; -import io.confluent.ksql.rest.server.resources.KsqlRestException; -import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; import java.time.Duration; import java.util.ArrayList; @@ -158,8 +152,7 @@ public void setUp() { commandIdSerializer, commandSerializer, commandIdDeserializer, - commandTopicBackup, - RateLimiter.create(KsqlConfig.KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG_DEFAULT) + commandTopicBackup ); } @@ -386,36 +379,6 @@ public void shouldSuccessfullyAbortAndRetry() { commandStore.enqueueCommand(commandId, command, transactionalProducer); } - @Test - public void shouldFailEnqueueIfRateLimitHit() { - // Given: - final CommandStore lowRateLimitStore = new CommandStore( - COMMAND_TOPIC_NAME, - commandTopic, - sequenceNumberFutureStore, - Collections.emptyMap(), - Collections.emptyMap(), - TIMEOUT, - commandIdSerializer, - commandSerializer, - commandIdDeserializer, - commandTopicBackup, - RateLimiter.create(1.0) - ); - - // When: - lowRateLimitStore.enqueueCommand(commandId, command, transactionalProducer); - - // Then: - final KsqlRestException e = assertThrows( - KsqlRestException.class, - () -> lowRateLimitStore.enqueueCommand(commandId, command, transactionalProducer) - ); - assertEquals(e.getResponse().getStatus(), 429); - final KsqlErrorMessage errorMessage = (KsqlErrorMessage) e.getResponse().getEntity(); - assertTrue(errorMessage.getMessage().contains("Too many writes to the command topic within a 1 second timeframe")); - } - private static ConsumerRecords buildRecords(final Object... args) { assertThat(args.length % 2, equalTo(0)); final List> records = new ArrayList<>(); diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/DistributingExecutorTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/DistributingExecutorTest.java index 9df37f19a74b..a3243f7ec50e 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/DistributingExecutorTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/DistributingExecutorTest.java @@ -20,7 +20,9 @@ import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.isA; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doNothing; @@ -59,8 +61,10 @@ import io.confluent.ksql.rest.entity.CommandStatus; import io.confluent.ksql.rest.entity.CommandStatus.Status; import io.confluent.ksql.rest.entity.CommandStatusEntity; +import io.confluent.ksql.rest.entity.KsqlErrorMessage; import io.confluent.ksql.rest.entity.WarningEntity; import io.confluent.ksql.rest.server.execution.StatementExecutorResponse; +import io.confluent.ksql.rest.server.resources.KsqlRestException; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.security.KsqlAuthorizationValidator; import io.confluent.ksql.security.KsqlSecurityContext; @@ -496,4 +500,33 @@ CommonCreateConfigs.VALUE_FORMAT_PROPERTY, new StringLiteral("json") assertThat("Should be present", response.getEntity().isPresent()); assertThat(((WarningEntity) response.getEntity().get()).getMessage(), containsString("")); } + + @Test + public void shouldThrowIfRateLimitHit() { + // Given: + final DistributingExecutor rateLimitedDistributor = new DistributingExecutor( + new KsqlConfig(ImmutableMap.of("ksql.command.topic.rate.limit", 1.0)), + queue, + DURATION_10_MS, + (ec, sc) -> InjectorChain.of(schemaInjector, topicInjector), + Optional.of(authorizationValidator), + validatedCommandFactory, + errorHandler, + commandRunnerWarning + ); + + // When: + distributor.execute(CONFIGURED_STATEMENT, executionContext, securityContext); + + + // Then: + final KsqlRestException e = assertThrows( + KsqlRestException.class, + () -> distributor.execute(CONFIGURED_STATEMENT, executionContext, securityContext) + ); + + assertEquals(e.getResponse().getStatus(), 429); + final KsqlErrorMessage errorMessage = (KsqlErrorMessage) e.getResponse().getEntity(); + assertTrue(errorMessage.getMessage().contains("Too many writes to the command topic within a 1 second timeframe")); + } } From 3aedfb105661bbb0ddff4281f3a980ee087e9546 Mon Sep 17 00:00:00 2001 From: Leah Thomas Date: Tue, 1 Mar 2022 11:00:06 -0600 Subject: [PATCH 7/8] move config to rest config --- .../confluent/ksql/config/ImmutableProperties.java | 1 - .../java/io/confluent/ksql/util/KsqlConfig.java | 13 ------------- .../confluent/ksql/rest/server/KsqlRestConfig.java | 14 +++++++++++++- .../server/computation/DistributingExecutor.java | 4 +++- .../computation/DistributingExecutorTest.java | 7 ++++--- 5 files changed, 20 insertions(+), 19 deletions(-) diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/config/ImmutableProperties.java b/ksqldb-common/src/main/java/io/confluent/ksql/config/ImmutableProperties.java index 2e0c69a02c21..3022f2004194 100644 --- a/ksqldb-common/src/main/java/io/confluent/ksql/config/ImmutableProperties.java +++ b/ksqldb-common/src/main/java/io/confluent/ksql/config/ImmutableProperties.java @@ -36,7 +36,6 @@ public final class ImmutableProperties { .add(KsqlConfig.KSQL_SOURCE_TABLE_MATERIALIZATION_ENABLED) .add(KsqlConfig.KSQL_HEADERS_COLUMNS_ENABLED) .addAll(KsqlConfig.SSL_CONFIG_NAMES) - .add(KsqlConfig.KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG) .build(); private ImmutableProperties() { diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java b/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java index fa95388af55b..4cfaa3fb2cd6 100644 --- a/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java +++ b/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java @@ -663,12 +663,6 @@ public class KsqlConfig extends AbstractConfig { private static final String KSQL_ENDPOINT_MIGRATE_QUERY_DOC = "Migrates the /query endpoint to use the same handler as /query-stream."; - public static final String KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG - = "ksql.command.topic.rate.limit"; - public static final double KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG_DEFAULT = 10.0; - private static final String KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG_DEFAULT_DOC - = "Sets the number of statements that can be executed against the command topic per second"; - private enum ConfigGeneration { LEGACY, CURRENT @@ -1435,13 +1429,6 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) { Importance.LOW, KSQL_TRANSIENT_QUERY_CLEANUP_SERVICE_PERIOD_SECONDS_DOC ) - .define( - KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG, - Type.DOUBLE, - KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG_DEFAULT, - Importance.LOW, - KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG_DEFAULT_DOC - ) .withClientSslSupport(); for (final CompatibilityBreakingConfigDef compatibilityBreakingConfigDef diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java index 19bb06ec03cd..6b9758e46593 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java @@ -396,6 +396,12 @@ public class KsqlRestConfig extends AbstractConfig { + "returns a 421 mis-directed response. (NOTE: this check should not be enabled if " + "ksqlDB servers have mutual TLS enabled)"; + public static final String KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG = + KSQL_CONFIG_PREFIX + "server.command.topic.rate.limit"; + public static final double KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG_DEFAULT = Double.MAX_VALUE; + private static final String KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG_DEFAULT_DOC + = "Sets the number of statements that can be executed against the command topic per second"; + private static final ConfigDef CONFIG_DEF; static { @@ -749,7 +755,13 @@ public class KsqlRestConfig extends AbstractConfig { KSQL_SERVER_SNI_CHECK_ENABLE_DEFAULT, Importance.LOW, KSQL_SERVER_SNI_CHECK_ENABLE_DOC - ); + ).define( + KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG, + Type.DOUBLE, + KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG_DEFAULT, + Importance.LOW, + KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG_DEFAULT_DOC + ); } public KsqlRestConfig(final Map props) { diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/DistributingExecutor.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/DistributingExecutor.java index d91ab4a90ed9..614d528e1228 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/DistributingExecutor.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/DistributingExecutor.java @@ -32,6 +32,7 @@ import io.confluent.ksql.rest.entity.CommandStatusEntity; import io.confluent.ksql.rest.entity.KsqlWarning; import io.confluent.ksql.rest.entity.WarningEntity; +import io.confluent.ksql.rest.server.KsqlRestConfig; import io.confluent.ksql.rest.server.execution.StatementExecutorResponse; import io.confluent.ksql.rest.server.resources.KsqlRestException; import io.confluent.ksql.security.KsqlAuthorizationValidator; @@ -101,8 +102,9 @@ public DistributingExecutor( this.errorHandler = Objects.requireNonNull(errorHandler, "errorHandler"); this.commandRunnerWarning = Objects.requireNonNull(commandRunnerWarning, "commandRunnerWarning"); + final KsqlRestConfig restConfig = new KsqlRestConfig(ksqlConfig.originals()); this.rateLimiter = - RateLimiter.create(ksqlConfig.getDouble(KsqlConfig.KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG)); + RateLimiter.create(restConfig.getDouble(KsqlRestConfig.KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG)); } // CHECKSTYLE_RULES.OFF: CyclomaticComplexity diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/DistributingExecutorTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/DistributingExecutorTest.java index a3243f7ec50e..1b1b0d7c6275 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/DistributingExecutorTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/DistributingExecutorTest.java @@ -63,6 +63,7 @@ import io.confluent.ksql.rest.entity.CommandStatusEntity; import io.confluent.ksql.rest.entity.KsqlErrorMessage; import io.confluent.ksql.rest.entity.WarningEntity; +import io.confluent.ksql.rest.server.KsqlRestConfig; import io.confluent.ksql.rest.server.execution.StatementExecutorResponse; import io.confluent.ksql.rest.server.resources.KsqlRestException; import io.confluent.ksql.schema.ksql.LogicalSchema; @@ -505,7 +506,7 @@ CommonCreateConfigs.VALUE_FORMAT_PROPERTY, new StringLiteral("json") public void shouldThrowIfRateLimitHit() { // Given: final DistributingExecutor rateLimitedDistributor = new DistributingExecutor( - new KsqlConfig(ImmutableMap.of("ksql.command.topic.rate.limit", 1.0)), + new KsqlConfig(ImmutableMap.of(KsqlRestConfig.KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG, 1.0)), queue, DURATION_10_MS, (ec, sc) -> InjectorChain.of(schemaInjector, topicInjector), @@ -516,13 +517,13 @@ public void shouldThrowIfRateLimitHit() { ); // When: - distributor.execute(CONFIGURED_STATEMENT, executionContext, securityContext); + rateLimitedDistributor.execute(CONFIGURED_STATEMENT, executionContext, securityContext); // Then: final KsqlRestException e = assertThrows( KsqlRestException.class, - () -> distributor.execute(CONFIGURED_STATEMENT, executionContext, securityContext) + () -> rateLimitedDistributor.execute(CONFIGURED_STATEMENT, executionContext, securityContext) ); assertEquals(e.getResponse().getStatus(), 429); From 181d7623a0bf6e5ef644f51ccf294eb3a8360f18 Mon Sep 17 00:00:00 2001 From: Leah Thomas Date: Tue, 1 Mar 2022 11:36:53 -0600 Subject: [PATCH 8/8] rohan's review --- .../java/io/confluent/ksql/rest/server/KsqlRestConfig.java | 6 +++--- .../ksql/rest/server/computation/DistributingExecutor.java | 7 +++++-- .../rest/server/computation/DistributingExecutorTest.java | 4 ++-- 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java index 6b9758e46593..55c60f354cca 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java @@ -397,10 +397,10 @@ public class KsqlRestConfig extends AbstractConfig { + "ksqlDB servers have mutual TLS enabled)"; public static final String KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG = - KSQL_CONFIG_PREFIX + "server.command.topic.rate.limit"; + KSQL_CONFIG_PREFIX + "server.command.topic.rate.limit"; public static final double KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG_DEFAULT = Double.MAX_VALUE; - private static final String KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG_DEFAULT_DOC - = "Sets the number of statements that can be executed against the command topic per second"; + private static final String KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG_DEFAULT_DOC = + "Sets the number of statements that can be executed against the command topic per second"; private static final ConfigDef CONFIG_DEF; diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/DistributingExecutor.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/DistributingExecutor.java index 614d528e1228..3ba1665bbbda 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/DistributingExecutor.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/DistributingExecutor.java @@ -49,6 +49,7 @@ import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.concurrent.TimeUnit; import java.util.function.BiFunction; import java.util.function.Supplier; import org.apache.kafka.clients.producer.Producer; @@ -63,7 +64,9 @@ * duration for the command to be executed remotely if configured with a * {@code distributedCmdResponseTimeout}. */ +// CHECKSTYLE_RULES.OFF: ClassDataAbstractionCoupling public class DistributingExecutor { + // CHECKSTYLE_RULES.ON: ClassDataAbstractionCoupling private final CommandQueue commandQueue; private final Duration distributedCmdResponseTimeout; private final BiFunction injectorFactory; @@ -203,10 +206,10 @@ public StatementExecutorResponse execute( statement.getStatementText()), e); } - if (!rateLimiter.tryAcquire()) { + if (!rateLimiter.tryAcquire(1, TimeUnit.SECONDS)) { throw new KsqlRestException( Errors.tooManyRequests( - "Too many writes to the command topic within a 1 second timeframe" + "DDL/DML rate is crossing the configured rate limit of statements/second" )); } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/DistributingExecutorTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/DistributingExecutorTest.java index 1b1b0d7c6275..931cb34f81bb 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/DistributingExecutorTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/DistributingExecutorTest.java @@ -506,7 +506,7 @@ CommonCreateConfigs.VALUE_FORMAT_PROPERTY, new StringLiteral("json") public void shouldThrowIfRateLimitHit() { // Given: final DistributingExecutor rateLimitedDistributor = new DistributingExecutor( - new KsqlConfig(ImmutableMap.of(KsqlRestConfig.KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG, 1.0)), + new KsqlConfig(ImmutableMap.of(KsqlRestConfig.KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG, 0.5)), queue, DURATION_10_MS, (ec, sc) -> InjectorChain.of(schemaInjector, topicInjector), @@ -528,6 +528,6 @@ public void shouldThrowIfRateLimitHit() { assertEquals(e.getResponse().getStatus(), 429); final KsqlErrorMessage errorMessage = (KsqlErrorMessage) e.getResponse().getEntity(); - assertTrue(errorMessage.getMessage().contains("Too many writes to the command topic within a 1 second timeframe")); + assertTrue(errorMessage.getMessage().contains("DDL/DML rate is crossing the configured rate limit of statements/second")); } }