From 5b543be5f4a370f37323e9f5b6c0c56a201f0462 Mon Sep 17 00:00:00 2001 From: Leah Thomas Date: Fri, 25 Feb 2022 12:40:40 -0600 Subject: [PATCH] test + review --- .../ksql/rest/server/KsqlRestApplication.java | 2 +- .../server/computation/CommandRunner.java | 6 ++-- .../server/computation/CommandRunnerTest.java | 36 +++++++++++++++++++ 3 files changed, 40 insertions(+), 4 deletions(-) diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java index 0332e7b4a642..ff2d6730b452 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java @@ -933,7 +933,7 @@ static KsqlRestApplication buildApplication( internalTopicClient, commandTopicName, metricCollectors.getMetrics(), - ksqlConfig.getDouble("ksql.command.topic.rate.limit") + ksqlConfig.getDouble(KsqlConfig.KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG) ); final KsqlResource ksqlResource = new KsqlResource( diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java index 806a31798d17..a712159806cf 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java @@ -27,7 +27,6 @@ import io.confluent.ksql.rest.util.PersistentQueryCleanupImpl; import io.confluent.ksql.rest.util.TerminateCluster; import io.confluent.ksql.services.KafkaTopicClient; -import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.Pair; import io.confluent.ksql.util.PersistentQueryMetadata; import io.confluent.ksql.util.QueryMetadata; @@ -356,8 +355,9 @@ private void executeStatement(final QueuedCommand queuedCommand) { } else { if (!rateLimiter.tryAcquire()) { throw new KsqlRestException( - Errors.tooManyRequests("Too many requests to the command topic within a 1 second timeframe") - ); + Errors.tooManyRequests( + "Too many requests to the command topic within a 1 second timeframe" + )); } statementExecutor.handleStatement(queuedCommand); LOG.info("Executed statement: " + commandStatement); diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java index 63467bac3cd8..ad9f770be849 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java @@ -17,8 +17,10 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; @@ -41,7 +43,9 @@ import io.confluent.ksql.engine.KsqlEngine; import io.confluent.ksql.metrics.MetricCollectors; import io.confluent.ksql.rest.Errors; +import io.confluent.ksql.rest.entity.KsqlErrorMessage; import io.confluent.ksql.rest.server.resources.IncompatibleKsqlCommandVersionException; +import io.confluent.ksql.rest.server.resources.KsqlRestException; import io.confluent.ksql.rest.server.state.ServerState; import io.confluent.ksql.rest.util.ClusterTerminator; import io.confluent.ksql.rest.util.PersistentQueryCleanupImpl; @@ -653,6 +657,38 @@ public void shouldCloseTheCommandRunnerCorrectly() throws Exception { verify(commandStore).close(); } + + @Test + public void shouldThrowWhenRateLimitHit() { + // Given: + final CommandRunner rateLimitedCommandRunner = new CommandRunner( + statementExecutor, + commandStore, + 3, + clusterTerminator, + executor, + serverState, + "ksql-service-id", + Duration.ofMillis(COMMAND_RUNNER_HEALTH_TIMEOUT), + "", + clock, + compactor, + incompatibleCommandChecker, + commandDeserializer, + errorHandler, + commandTopicExists, + new Metrics(), + 1.0 + ); + givenQueuedCommands(queuedCommand1, queuedCommand2, queuedCommand3); + + // When: + // Then: + final KsqlRestException e = assertThrows(KsqlRestException.class, rateLimitedCommandRunner::fetchAndRunCommands); + assertEquals(e.getResponse().getStatus(), 429); + final KsqlErrorMessage errorMessage = (KsqlErrorMessage) e.getResponse().getEntity(); + assertTrue(errorMessage.getMessage().contains("Too many requests to the command topic within a 1 second timeframe")); + } private Runnable getThreadTask() { verify(executor).execute(threadTaskCaptor.capture());