Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add rate-limiting to ksql command topic #8809

Merged
merged 8 commits into from
Mar 2, 2022

Conversation

lct45
Copy link
Contributor

@lct45 lct45 commented Feb 24, 2022

Description

We recently updated the command topic in #8742 to make sure that it's created with all the proper configs. Now that the command topic can be created on a separate Kafka from the rest of the topics, we want to rate limit the command topic to ensure that it isn't weighing down the kafka it's running on.

Testing done

Tested locally

Reviewer checklist

  • Ensure docs are updated if necessary. (eg. if a user visible feature is being added or changed).
  • Ensure relevant issues are linked (description should include text like "Fixes #")

@lct45 lct45 marked this pull request as ready for review February 25, 2022 15:42
@lct45 lct45 requested a review from a team as a code owner February 25, 2022 15:42
Copy link
Contributor

@wcarlson5 wcarlson5 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couple things. It would be nice to get a test or two nothing crazy. Just so if someone changes related code without knowing about this and break it we will catch it

@@ -346,6 +354,11 @@ private void executeStatement(final QueuedCommand queuedCommand) {
if (closed) {
LOG.info("Execution aborted as system is closing down");
} else {
if (!rateLimiter.tryAcquire()) {
throw new KsqlRestException(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How will this error get handled?

And can you a unit test to make sure its gets thrown and handled?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah writing a unit test rn - it will return an error back up to the user

Copy link
Contributor

@wcarlson5 wcarlson5 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@@ -663,6 +663,12 @@
private static final String KSQL_ENDPOINT_MIGRATE_QUERY_DOC
= "Migrates the /query endpoint to use the same handler as /query-stream.";

public static final String KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG
= "ksql.command.topic.rate.limit";
public static final double KSQL_COMMAND_TOPIC_RATE_LIMIT_CONFIG_DEFAULT = 10.0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Do we need a double or can we use an int?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we set the default much higher (like 1000)? I think 10 is too low for a default and many users may inadvertently hit this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, could you move the config to KsqlRestConfig and follow a similar naming pattern to KSQL_COMMAND_RUNNER_BLOCKED_THRESHHOLD_ERROR_MS and DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT_MS_CONFIG with ksql.server.command prefix? I think we should keep the command topic/command runner configs together and named consistently

Copy link
Contributor Author

@lct45 lct45 Mar 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stevenpyzhang I moved it but now it can't be included in the ImmutableProperties list and we get issues when creating the rate limiter. I upped the limit for now, @rodesai WDYT about doing double max value versus 10? seems ok to me

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we should definitely disable rate-limiting by default by setting this to MAX_INT. We can enable in cloud as needed.

@jnh5y
Copy link
Member

jnh5y commented Feb 25, 2022

Is this going to cause users scripting commands to have a rough time?

@lct45
Copy link
Contributor Author

lct45 commented Feb 25, 2022

@jnh5y correct me if my understanding is wrong, but users who are scripting commands would also be running their own ksql, right? From my understanding Confluent Cloud users can't write a script to create streams / tables unless it issues a curl request, but it seems like that would take enough time to prevent us getting over 10 requests in a second.

If a user is running their own ksql they can configure this through the properties file

@jnh5y
Copy link
Member

jnh5y commented Feb 25, 2022

Were I running my own ksql, I think having to set this would be somewhat surprising.

For ccloud, I suppose it depends on how someone was scripting things;).

Copy link
Member

@stevenpyzhang stevenpyzhang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we wanted to rate limit the number of records written to the command topic? This PR looks like it only rate limits how fast we read the records from the command topic, someone could still write a ton of records to the command topic.

@lct45
Copy link
Contributor Author

lct45 commented Feb 28, 2022

error on the cli:

ksql> create stream c1 as select * from riderlocations; 
Too many writes to the command topic within a 1 second timeframe

@wcarlson5
Copy link
Contributor

wcarlson5 commented Feb 28, 2022

@lct45 what happens if you try a batch like this all at once?

Create stream users (A string, B string key) with (kafka_topic='users', VALUE_FORMAT='json', PARTITIONS=1);
create or replace table users_agg0 as select B, count(*) from USERS group by B;
create table users_agg1 as select B, count(*) from USERS group by B;
create table users_agg2 as select B, count(*) from USERS group by B;
create table users_agg3 as select B, count(*) from USERS group by B;
create table users_agg4 as select B, count(*) from USERS group by B;
create table users_agg5 as select B, count(*) from USERS group by B;
create table users_agg6 as select B, count(*) from USERS group by B;
create table users_agg7 as select B, count(*) from USERS group by B;
create table users_agg8 as select B, count(*) from USERS group by B;
create table users_agg9 as select B, count(*) from USERS group by B;
create table users_agg10 as select B, count(*) from USERS group by B;
create table users_agg11 as select B, count(*) from USERS group by B;
create table users_agg12 as select B, count(*) from USERS group by B;
create table users_agg13 as select B, count(*) from USERS group by B;
create table users_agg14 as select B, count(*) from USERS group by B;
create table users_agg15 as select B, count(*) from USERS group by B;
create table users_agg16 as select B, count(*) from USERS group by B;
create table users_agg17 as select B, count(*) from USERS group by B;
create table users_agg18 as select B, count(*) from USERS group by B;

@lct45
Copy link
Contributor Author

lct45 commented Feb 28, 2022

@wcarlson5 it executed all the statements with no errors (I did it as one block from the CLI)

public void shouldThrowIfRateLimitHit() {
// Given:
final DistributingExecutor rateLimitedDistributor = new DistributingExecutor(
new KsqlConfig(ImmutableMap.of("ksql.command.topic.rate.limit", 1.0)),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: use the const in KsqlConfig

if (!rateLimiter.tryAcquire()) {
throw new KsqlRestException(
Errors.tooManyRequests(
"Too many writes to the command topic within a 1 second timeframe"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: change this to "DDL/DML rate is crossing the configured rate limit of statements/second"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

@@ -196,6 +203,13 @@ public StatementExecutorResponse execute(
statement.getStatementText()), e);
}

if (!rateLimiter.tryAcquire()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would make sense to tryAcquire with a delay of a second here. This way we don't fail scripts that serially try to enqueue a large number of commands.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rodesai just added -> will you confirm this is what you were thinking?

@lct45 lct45 requested a review from rodesai March 1, 2022 17:37
Copy link
Contributor

@rodesai rodesai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@lct45
Copy link
Contributor Author

lct45 commented Mar 2, 2022

@lct45 lct45 merged commit b2f1540 into confluentinc:master Mar 2, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants