Skip to content

Commit

Permalink
test + review
Browse files Browse the repository at this point in the history
  • Loading branch information
lct45 committed Feb 25, 2022
1 parent 67b1011 commit 5b543be
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -933,7 +933,7 @@ static KsqlRestApplication buildApplication(
internalTopicClient,
commandTopicName,
metricCollectors.getMetrics(),
ksqlConfig.getDouble("ksql.command.topic.rate.limit")
ksqlConfig.getDouble(KsqlConfig.KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG)
);

final KsqlResource ksqlResource = new KsqlResource(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import io.confluent.ksql.rest.util.PersistentQueryCleanupImpl;
import io.confluent.ksql.rest.util.TerminateCluster;
import io.confluent.ksql.services.KafkaTopicClient;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.Pair;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.QueryMetadata;
Expand Down Expand Up @@ -356,8 +355,9 @@ private void executeStatement(final QueuedCommand queuedCommand) {
} else {
if (!rateLimiter.tryAcquire()) {
throw new KsqlRestException(
Errors.tooManyRequests("Too many requests to the command topic within a 1 second timeframe")
);
Errors.tooManyRequests(
"Too many requests to the command topic within a 1 second timeframe"
));
}
statementExecutor.handleStatement(queuedCommand);
LOG.info("Executed statement: " + commandStatement);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
Expand All @@ -41,7 +43,9 @@
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.metrics.MetricCollectors;
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.entity.KsqlErrorMessage;
import io.confluent.ksql.rest.server.resources.IncompatibleKsqlCommandVersionException;
import io.confluent.ksql.rest.server.resources.KsqlRestException;
import io.confluent.ksql.rest.server.state.ServerState;
import io.confluent.ksql.rest.util.ClusterTerminator;
import io.confluent.ksql.rest.util.PersistentQueryCleanupImpl;
Expand Down Expand Up @@ -653,6 +657,38 @@ public void shouldCloseTheCommandRunnerCorrectly() throws Exception {

verify(commandStore).close();
}

@Test
public void shouldThrowWhenRateLimitHit() {
// Given:
final CommandRunner rateLimitedCommandRunner = new CommandRunner(
statementExecutor,
commandStore,
3,
clusterTerminator,
executor,
serverState,
"ksql-service-id",
Duration.ofMillis(COMMAND_RUNNER_HEALTH_TIMEOUT),
"",
clock,
compactor,
incompatibleCommandChecker,
commandDeserializer,
errorHandler,
commandTopicExists,
new Metrics(),
1.0
);
givenQueuedCommands(queuedCommand1, queuedCommand2, queuedCommand3);

// When:
// Then:
final KsqlRestException e = assertThrows(KsqlRestException.class, rateLimitedCommandRunner::fetchAndRunCommands);
assertEquals(e.getResponse().getStatus(), 429);
final KsqlErrorMessage errorMessage = (KsqlErrorMessage) e.getResponse().getEntity();
assertTrue(errorMessage.getMessage().contains("Too many requests to the command topic within a 1 second timeframe"));
}

private Runnable getThreadTask() {
verify(executor).execute(threadTaskCaptor.capture());
Expand Down

0 comments on commit 5b543be

Please sign in to comment.