Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add rate-limiting to ksql command topic #8809

Merged
merged 8 commits into from
Mar 2, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public final class ImmutableProperties {
.add(KsqlConfig.KSQL_SOURCE_TABLE_MATERIALIZATION_ENABLED)
.add(KsqlConfig.KSQL_HEADERS_COLUMNS_ENABLED)
.addAll(KsqlConfig.SSL_CONFIG_NAMES)
.add(KsqlConfig.KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG)
.build();

private ImmutableProperties() {
Expand Down
13 changes: 13 additions & 0 deletions ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,12 @@ public class KsqlConfig extends AbstractConfig {
private static final String KSQL_ENDPOINT_MIGRATE_QUERY_DOC
= "Migrates the /query endpoint to use the same handler as /query-stream.";

public static final String KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG
= "ksql.command.topic.rate.limit";
public static final double KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG_DEFAULT = 10.0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Do we need a double or can we use an int?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we set the default much higher (like 1000)? I think 10 is too low for a default and many users may inadvertently hit this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, could you move the config to KsqlRestConfig and follow a similar naming pattern to KSQL_COMMAND_RUNNER_BLOCKED_THRESHHOLD_ERROR_MS and DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT_MS_CONFIG with ksql.server.command prefix? I think we should keep the command topic/command runner configs together and named consistently

Copy link
Contributor Author

@lct45 lct45 Mar 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stevenpyzhang I moved it but now it can't be included in the ImmutableProperties list and we get issues when creating the rate limiter. I upped the limit for now, @rodesai WDYT about doing double max value versus 10? seems ok to me

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we should definitely disable rate-limiting by default by setting this to MAX_INT. We can enable in cloud as needed.

private static final String KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG_DEFAULT_DOC
= "Sets the number of statements that can be executed against the command topic per second";

private enum ConfigGeneration {
LEGACY,
CURRENT
Expand Down Expand Up @@ -1429,6 +1435,13 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
Importance.LOW,
KSQL_TRANSIENT_QUERY_CLEANUP_SERVICE_PERIOD_SECONDS_DOC
)
.define(
KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG,
Type.DOUBLE,
KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG_DEFAULT,
Importance.LOW,
KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG_DEFAULT_DOC
)
.withClientSslSupport();

for (final CompatibilityBreakingConfigDef compatibilityBreakingConfigDef
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -932,7 +932,8 @@ static KsqlRestApplication buildApplication(
errorHandler,
internalTopicClient,
commandTopicName,
metricCollectors.getMetrics()
metricCollectors.getMetrics(),
ksqlConfig.getDouble(KsqlConfig.KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG)
);

final KsqlResource ksqlResource = new KsqlResource(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
package io.confluent.ksql.rest.server.computation;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.RateLimiter;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.entity.ClusterTerminateRequest;
import io.confluent.ksql.rest.server.resources.IncompatibleKsqlCommandVersionException;
import io.confluent.ksql.rest.server.resources.KsqlRestException;
import io.confluent.ksql.rest.server.state.ServerState;
import io.confluent.ksql.rest.util.ClusterTerminator;
import io.confluent.ksql.rest.util.PersistentQueryCleanupImpl;
Expand Down Expand Up @@ -90,6 +92,7 @@ public class CommandRunner implements Closeable {
private final Supplier<Boolean> commandTopicExists;
private boolean commandTopicDeleted;
private Status state = new Status(CommandRunnerStatus.RUNNING, CommandRunnerDegradedReason.NONE);
private RateLimiter rateLimiter;

public enum CommandRunnerStatus {
RUNNING,
Expand Down Expand Up @@ -148,7 +151,8 @@ public CommandRunner(
final Errors errorHandler,
final KafkaTopicClient kafkaTopicClient,
final String commandTopicName,
final Metrics metrics
final Metrics metrics,
final double rateLimit
) {
this(
statementExecutor,
Expand All @@ -169,7 +173,8 @@ public CommandRunner(
commandDeserializer,
errorHandler,
() -> kafkaTopicClient.isTopicExists(commandTopicName),
metrics
metrics,
rateLimit
);
}

Expand All @@ -191,7 +196,8 @@ public CommandRunner(
final Deserializer<Command> commandDeserializer,
final Errors errorHandler,
final Supplier<Boolean> commandTopicExists,
final Metrics metrics
final Metrics metrics,
final double rateLimit
) {
// CHECKSTYLE_RULES.ON: ParameterNumberCheck
this.statementExecutor = Objects.requireNonNull(statementExecutor, "statementExecutor");
Expand All @@ -218,6 +224,7 @@ public CommandRunner(
Objects.requireNonNull(commandTopicExists, "commandTopicExists");
this.incompatibleCommandDetected = false;
this.commandTopicDeleted = false;
this.rateLimiter = RateLimiter.create(rateLimit);
}

/**
Expand Down Expand Up @@ -346,6 +353,12 @@ private void executeStatement(final QueuedCommand queuedCommand) {
if (closed) {
LOG.info("Execution aborted as system is closing down");
} else {
if (!rateLimiter.tryAcquire()) {
throw new KsqlRestException(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How will this error get handled?

And can you a unit test to make sure its gets thrown and handled?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah writing a unit test rn - it will return an error back up to the user

Errors.tooManyRequests(
"Too many requests to the command topic within a 1 second timeframe"
));
}
statementExecutor.handleStatement(queuedCommand);
LOG.info("Executed statement: " + commandStatement);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
Expand All @@ -41,11 +43,14 @@
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.metrics.MetricCollectors;
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.entity.KsqlErrorMessage;
import io.confluent.ksql.rest.server.resources.IncompatibleKsqlCommandVersionException;
import io.confluent.ksql.rest.server.resources.KsqlRestException;
import io.confluent.ksql.rest.server.state.ServerState;
import io.confluent.ksql.rest.util.ClusterTerminator;
import io.confluent.ksql.rest.util.PersistentQueryCleanupImpl;
import io.confluent.ksql.rest.util.TerminateCluster;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.PersistentQueryMetadata;
import java.time.Clock;
import java.time.Duration;
Expand Down Expand Up @@ -166,7 +171,8 @@ public void setup() {
commandDeserializer,
errorHandler,
commandTopicExists,
new Metrics()
new Metrics(),
KsqlConfig.KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG_DEFAULT
);
}

Expand Down Expand Up @@ -651,6 +657,38 @@ public void shouldCloseTheCommandRunnerCorrectly() throws Exception {

verify(commandStore).close();
}

@Test
public void shouldThrowWhenRateLimitHit() {
// Given:
final CommandRunner rateLimitedCommandRunner = new CommandRunner(
statementExecutor,
commandStore,
3,
clusterTerminator,
executor,
serverState,
"ksql-service-id",
Duration.ofMillis(COMMAND_RUNNER_HEALTH_TIMEOUT),
"",
clock,
compactor,
incompatibleCommandChecker,
commandDeserializer,
errorHandler,
commandTopicExists,
new Metrics(),
1.0
);
givenQueuedCommands(queuedCommand1, queuedCommand2, queuedCommand3);

// When:
// Then:
final KsqlRestException e = assertThrows(KsqlRestException.class, rateLimitedCommandRunner::fetchAndRunCommands);
assertEquals(e.getResponse().getStatus(), 429);
final KsqlErrorMessage errorMessage = (KsqlErrorMessage) e.getResponse().getEntity();
assertTrue(errorMessage.getMessage().contains("Too many requests to the command topic within a 1 second timeframe"));
}

private Runnable getThreadTask() {
verify(executor).execute(threadTaskCaptor.capture());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,8 @@ private class KsqlServer {
errorHandler,
topicClient,
"command_topic",
new Metrics()
new Metrics(),
KsqlConfig.KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG_DEFAULT
);

this.ksqlResource = new KsqlResource(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
import static io.netty.handler.codec.http.HttpResponseStatus.PRECONDITION_REQUIRED;
import static io.netty.handler.codec.http.HttpResponseStatus.SERVICE_UNAVAILABLE;
import static io.netty.handler.codec.http.HttpResponseStatus.TOO_MANY_REQUESTS;
import static io.netty.handler.codec.http.HttpResponseStatus.UNAUTHORIZED;

import io.confluent.ksql.rest.entity.KsqlEntityList;
Expand Down Expand Up @@ -72,6 +73,8 @@ public final class Errors {
public static final int ERROR_CODE_SERVER_ERROR =
toErrorCode(INTERNAL_SERVER_ERROR.code());

public static final int ERROR_CODE_TOO_MANY_REQUESTS = toErrorCode(TOO_MANY_REQUESTS.code());

private final ErrorMessages errorMessages;

public static int toStatusCode(final int errorCode) {
Expand Down Expand Up @@ -208,6 +211,13 @@ public static EndpointResponse serverNotReady(final KsqlErrorMessage error) {
.build();
}

public static EndpointResponse tooManyRequests(final String msg) {
return EndpointResponse.create()
.status(TOO_MANY_REQUESTS.code())
.entity(new KsqlErrorMessage(ERROR_CODE_TOO_MANY_REQUESTS, msg))
.build();
}

public Errors(final ErrorMessages errorMessages) {
this.errorMessages = Objects.requireNonNull(errorMessages, "errorMessages");
}
Expand Down