Skip to content

Commit

Permalink
make rate limit quota set by config
Browse files Browse the repository at this point in the history
  • Loading branch information
lct45 committed Feb 25, 2022
1 parent b26d7b9 commit fba9fe4
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public final class ImmutableProperties {
.add(KsqlConfig.KSQL_SOURCE_TABLE_MATERIALIZATION_ENABLED)
.add(KsqlConfig.KSQL_HEADERS_COLUMNS_ENABLED)
.addAll(KsqlConfig.SSL_CONFIG_NAMES)
.add(KsqlConfig.KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG)
.build();

private ImmutableProperties() {
Expand Down
13 changes: 13 additions & 0 deletions ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,12 @@ public class KsqlConfig extends AbstractConfig {
private static final String KSQL_ENDPOINT_MIGRATE_QUERY_DOC
= "Migrates the /query endpoint to use the same handler as /query-stream.";

public static final String KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG
= "ksql.command.topic.rate.limit";
public static final double KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG_DEFAULT = 10.0;
private static final String KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG_DEFAULT_DOC
= "Sets the number of statements that can be executed against the command topic per second";

private enum ConfigGeneration {
LEGACY,
CURRENT
Expand Down Expand Up @@ -1429,6 +1435,13 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
Importance.LOW,
KSQL_TRANSIENT_QUERY_CLEANUP_SERVICE_PERIOD_SECONDS_DOC
)
.define(
KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG,
Type.DOUBLE,
KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG_DEFAULT,
Importance.LOW,
KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG_DEFAULT_DOC
)
.withClientSslSupport();

for (final CompatibilityBreakingConfigDef compatibilityBreakingConfigDef
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -932,7 +932,8 @@ static KsqlRestApplication buildApplication(
errorHandler,
internalTopicClient,
commandTopicName,
metricCollectors.getMetrics()
metricCollectors.getMetrics(),
ksqlConfig.getDouble("ksql.command.topic.rate.limit")
);

final KsqlResource ksqlResource = new KsqlResource(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ public CommandRunner(
final Errors errorHandler,
final KafkaTopicClient kafkaTopicClient,
final String commandTopicName,
final Metrics metrics
final Metrics metrics,
final double rateLimit
) {
this(
statementExecutor,
Expand All @@ -171,7 +172,8 @@ public CommandRunner(
commandDeserializer,
errorHandler,
() -> kafkaTopicClient.isTopicExists(commandTopicName),
metrics
metrics,
rateLimit
);
}

Expand All @@ -193,7 +195,8 @@ public CommandRunner(
final Deserializer<Command> commandDeserializer,
final Errors errorHandler,
final Supplier<Boolean> commandTopicExists,
final Metrics metrics
final Metrics metrics,
final double rateLimit
) {
// CHECKSTYLE_RULES.ON: ParameterNumberCheck
this.statementExecutor = Objects.requireNonNull(statementExecutor, "statementExecutor");
Expand All @@ -220,7 +223,7 @@ public CommandRunner(
Objects.requireNonNull(commandTopicExists, "commandTopicExists");
this.incompatibleCommandDetected = false;
this.commandTopicDeleted = false;
this.rateLimiter = RateLimiter.create(500.0);
this.rateLimiter = RateLimiter.create(rateLimit);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import io.confluent.ksql.rest.util.ClusterTerminator;
import io.confluent.ksql.rest.util.PersistentQueryCleanupImpl;
import io.confluent.ksql.rest.util.TerminateCluster;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.PersistentQueryMetadata;
import java.time.Clock;
import java.time.Duration;
Expand Down Expand Up @@ -166,7 +167,8 @@ public void setup() {
commandDeserializer,
errorHandler,
commandTopicExists,
new Metrics()
new Metrics(),
KsqlConfig.KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG_DEFAULT
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,8 @@ private class KsqlServer {
errorHandler,
topicClient,
"command_topic",
new Metrics()
new Metrics(),
KsqlConfig.KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG_DEFAULT
);

this.ksqlResource = new KsqlResource(
Expand Down

0 comments on commit fba9fe4

Please sign in to comment.