Skip to content

Commit

Permalink
feat: scalable push query bandwidth throttling (#8087)
Browse files Browse the repository at this point in the history
* feat: throttle scalable push queries by bandwidth

* fix: remove http2 test methods

* test: fix test

* fix: remove http2 fix

* chore: rename scalable to v2

* chore: rename v2

* chore: rename v2

* chore: rename v2

* empty commit
  • Loading branch information
nateab authored Sep 8, 2021
1 parent bdfb628 commit d5af6a1
Show file tree
Hide file tree
Showing 15 changed files with 349 additions and 50 deletions.
15 changes: 15 additions & 0 deletions ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,14 @@ public class KsqlConfig extends AbstractConfig {
= "The maximum amount of pull query bandwidth in megabytes allowed over"
+ " a period of one hour. Once the limit is hit, queries will fail immediately";

public static final String KSQL_QUERY_PUSH_V2_MAX_HOURLY_BANDWIDTH_MEGABYTES_CONFIG
= "ksql.query.push.v2.max.hourly.bandwidth.megabytes";
public static final Integer KSQL_QUERY_PUSH_V2_MAX_HOURLY_BANDWIDTH_MEGABYTES_DEFAULT
= Integer.MAX_VALUE;
public static final String KSQL_QUERY_PUSH_V2_MAX_HOURLY_BANDWIDTH_MEGABYTES_DOC
= "The maximum amount of v2 push query bandwidth in megabytes allowed over"
+ " a period of one hour. Once the limit is hit, queries will fail immediately";

public static final String KSQL_QUERY_PULL_THREAD_POOL_SIZE_CONFIG
= "ksql.query.pull.thread.pool.size";
public static final Integer KSQL_QUERY_PULL_THREAD_POOL_SIZE_DEFAULT = 100;
Expand Down Expand Up @@ -927,6 +935,13 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
Importance.HIGH,
KSQL_QUERY_PULL_MAX_HOURLY_BANDWIDTH_MEGABYTES_DOC
)
.define(
KSQL_QUERY_PUSH_V2_MAX_HOURLY_BANDWIDTH_MEGABYTES_CONFIG,
Type.INT,
KSQL_QUERY_PUSH_V2_MAX_HOURLY_BANDWIDTH_MEGABYTES_DEFAULT,
Importance.HIGH,
KSQL_QUERY_PUSH_V2_MAX_HOURLY_BANDWIDTH_MEGABYTES_DOC
)
.define(
KSQL_QUERY_PULL_THREAD_POOL_SIZE_CONFIG,
Type.INT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public class QueryEndpoint {
private final RateLimiter rateLimiter;
private final ConcurrencyLimiter pullConcurrencyLimiter;
private final SlidingWindowRateLimiter pullBandRateLimiter;
private final SlidingWindowRateLimiter scalablePushBandRateLimiter;
private final HARouting routing;
private final PushRouting pushRouting;
private final Optional<LocalCommands> localCommands;
Expand All @@ -100,6 +101,7 @@ public QueryEndpoint(
final RateLimiter rateLimiter,
final ConcurrencyLimiter pullConcurrencyLimiter,
final SlidingWindowRateLimiter pullBandLimiter,
final SlidingWindowRateLimiter scalablePushBandRateLimiter,
final HARouting routing,
final PushRouting pushRouting,
final Optional<LocalCommands> localCommands
Expand All @@ -112,6 +114,7 @@ public QueryEndpoint(
this.rateLimiter = rateLimiter;
this.pullConcurrencyLimiter = pullConcurrencyLimiter;
this.pullBandRateLimiter = pullBandLimiter;
this.scalablePushBandRateLimiter = scalablePushBandRateLimiter;
this.routing = routing;
this.pushRouting = pushRouting;
this.localCommands = localCommands;
Expand Down Expand Up @@ -170,7 +173,8 @@ public QueryPublisher createQueryPublisher(
serviceContext,
statement,
workerExecutor,
requestProperties
requestProperties,
metricsCallbackHolder
);
} else {
return createPushQueryPublisher(context, serviceContext, statement, workerExecutor);
Expand All @@ -183,9 +187,15 @@ private QueryPublisher createScalablePushQueryPublisher(
final ServiceContext serviceContext,
final ConfiguredStatement<Query> statement,
final WorkerExecutor workerExecutor,
final Map<String, Object> requestProperties
final Map<String, Object> requestProperties,
final MetricsCallbackHolder metricsCallbackHolder
) {
final BlockingQueryPublisher publisher = new BlockingQueryPublisher(context, workerExecutor);
metricsCallbackHolder.setCallback((statusCode, requestBytes, responseBytes, startTimeNanos) -> {
scalablePushBandRateLimiter.add(responseBytes);
});

final BlockingQueryPublisher publisher =
new BlockingQueryPublisher(context, workerExecutor);

final PushQueryConfigRoutingOptions routingOptions =
new PushQueryConfigRoutingOptions(requestProperties);
Expand All @@ -194,6 +204,8 @@ private QueryPublisher createScalablePushQueryPublisher(
ksqlConfig,
statement.getSessionConfig().getOverrides());

scalablePushBandRateLimiter.allow();

final ScalablePushQueryMetadata query = ksqlEngine
.executeScalablePushQuery(analysis, serviceContext, statement, pushRouting, routingOptions,
plannerOptions, context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

/**
* SlidingWindowRateLimiter keeps a log of timestamps and the size for each response returned by
* pull queries. When a response comes, we first pop all outdated timestamps outside of past hour
* queries. When a response comes, we first pop all outdated timestamps outside of past hour
* before appending the new response time and size to the log. Then we decide whether this response
* should be processed depending on whether the log size has exceeded the throttleLimit.
* Many rate limiters require you to ask for access before it's granted whereas this method always
Expand Down Expand Up @@ -55,15 +55,15 @@ public class SlidingWindowRateLimiter {
private final long slidingWindowSizeMs;

/**
* Aggregate of pull query response sizes in the past hour
* Aggregate of query response sizes in the past hour
*/
private long numBytesInWindow;

public SlidingWindowRateLimiter(final int requestLimitInMB, final long slidingWindowSizeMs) {
checkArgument(requestLimitInMB >= 0,
"Pull Query bandwidth limit can't be negative.");
"Query bandwidth limit can't be negative.");
checkArgument(slidingWindowSizeMs >= 0,
"Pull Query throttle window size can't be negative");
"Query throttle window size can't be negative");

this.throttleLimit = (long) requestLimitInMB * NUM_BYTES_IN_ONE_MEGABYTE;
this.slidingWindowSizeMs = slidingWindowSizeMs;
Expand All @@ -72,9 +72,9 @@ public SlidingWindowRateLimiter(final int requestLimitInMB, final long slidingWi
}

/**
* Checks if pull queries have returned more than the throttleLimit in the past hour.
* Checks if queries have returned more than the throttleLimit in the past hour.
* Throws a KsqlException is the limit has been breached
* @throws KsqlException Exception that the throttle limit has been reached for pull queries
* @throws KsqlException Exception that the throttle limit has been reached for queries
*/
public synchronized void allow() throws KsqlException {
this.allow(Time.SYSTEM.milliseconds());
Expand All @@ -90,14 +90,14 @@ protected synchronized void allow(final long timestamp) throws KsqlException {
this.numBytesInWindow -= responseSizesLog.poll().right;
}
if (this.numBytesInWindow > throttleLimit) {
throw new KsqlException("Host is at bandwidth rate limit for pull queries.");
throw new KsqlException("Host is at bandwidth rate limit for queries.");
}
}

/**
* Adds the responseSizeInBytes and its timestamp to the queue of all response sizes
* in the past hour.
* @param responseSizeInBytes pull query response size measured in Bytes
* @param responseSizeInBytes query response size measured in Bytes
*/
public synchronized void add(final long responseSizeInBytes) {
add(Time.SYSTEM.milliseconds(), responseSizeInBytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ public final class KsqlRestApplication implements Executable {
private final RateLimiter pullQueryRateLimiter;
private final ConcurrencyLimiter pullConcurrencyLimiter;
private final SlidingWindowRateLimiter pullBandRateLimiter;
private final SlidingWindowRateLimiter scalablePushBandRateLimiter;
private final HARouting pullQueryRouting;
private final Optional<LocalCommands> localCommands;

Expand Down Expand Up @@ -243,6 +244,7 @@ public static SourceName getCommandsStreamName() {
final RateLimiter pullQueryRateLimiter,
final ConcurrencyLimiter pullConcurrencyLimiter,
final SlidingWindowRateLimiter pullBandRateLimiter,
final SlidingWindowRateLimiter scalablePushBandRateLimiter,
final HARouting pullQueryRouting,
final PushRouting pushQueryRouting,
final Optional<LocalCommands> localCommands
Expand Down Expand Up @@ -302,6 +304,8 @@ public static SourceName getCommandsStreamName() {
this.pullQueryRateLimiter = requireNonNull(pullQueryRateLimiter, "pullQueryRateLimiter");
this.pullConcurrencyLimiter = requireNonNull(pullConcurrencyLimiter, "pullConcurrencyLimiter");
this.pullBandRateLimiter = requireNonNull(pullBandRateLimiter, "pullBandRateLimiter");
this.scalablePushBandRateLimiter =
requireNonNull(scalablePushBandRateLimiter, "scalablePushBandRateLimiter");
this.pullQueryRouting = requireNonNull(pullQueryRouting, "pullQueryRouting");
this.pushQueryRouting = pushQueryRouting;
this.localCommands = requireNonNull(localCommands, "localCommands");
Expand Down Expand Up @@ -349,6 +353,7 @@ public void startAsync() {
pullQueryRateLimiter,
pullConcurrencyLimiter,
pullBandRateLimiter,
scalablePushBandRateLimiter,
pullQueryRouting,
localCommands,
pushQueryRouting
Expand Down Expand Up @@ -376,6 +381,7 @@ public void startAsync() {
pullQueryRateLimiter,
pullConcurrencyLimiter,
pullBandRateLimiter,
scalablePushBandRateLimiter,
pullQueryRouting,
pushQueryRouting,
localCommands
Expand Down Expand Up @@ -792,6 +798,10 @@ static KsqlRestApplication buildApplication(
final SlidingWindowRateLimiter pullBandRateLimiter = new SlidingWindowRateLimiter(
ksqlConfig.getInt(KsqlConfig.KSQL_QUERY_PULL_MAX_HOURLY_BANDWIDTH_MEGABYTES_CONFIG),
NUM_MILLISECONDS_IN_HOUR);
final SlidingWindowRateLimiter scalablePushBandRateLimiter = new SlidingWindowRateLimiter(
ksqlConfig.getInt(
KsqlConfig.KSQL_QUERY_PUSH_V2_MAX_HOURLY_BANDWIDTH_MEGABYTES_CONFIG),
NUM_MILLISECONDS_IN_HOUR);
final DenyListPropertyValidator denyListPropertyValidator = new DenyListPropertyValidator(
ksqlConfig.getList(KsqlConfig.KSQL_PROPERTIES_OVERRIDES_DENYLIST));

Expand Down Expand Up @@ -826,6 +836,7 @@ static KsqlRestApplication buildApplication(
pullQueryRateLimiter,
pullQueryConcurrencyLimiter,
pullBandRateLimiter,
scalablePushBandRateLimiter,
pullQueryRouting,
pushQueryRouting,
localCommands
Expand Down Expand Up @@ -905,6 +916,7 @@ static KsqlRestApplication buildApplication(
pullQueryRateLimiter,
pullQueryConcurrencyLimiter,
pullBandRateLimiter,
scalablePushBandRateLimiter,
pullQueryRouting,
pushQueryRouting,
localCommands
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ public class KsqlServerEndpoints implements Endpoints {
private final RateLimiter rateLimiter;
private final ConcurrencyLimiter pullConcurrencyLimiter;
private final SlidingWindowRateLimiter pullBandRateLimiter;
private final SlidingWindowRateLimiter scalablePushBandRateLimiter;
private final HARouting routing;
private final PushRouting pushRouting;
private final Optional<LocalCommands> localCommands;
Expand All @@ -118,6 +119,7 @@ public KsqlServerEndpoints(
final RateLimiter rateLimiter,
final ConcurrencyLimiter pullConcurrencyLimiter,
final SlidingWindowRateLimiter pullBandRateLimiter,
final SlidingWindowRateLimiter scalablePushBandRateLimiter,
final HARouting routing,
final PushRouting pushRouting,
final Optional<LocalCommands> localCommands
Expand All @@ -144,6 +146,7 @@ public KsqlServerEndpoints(
this.rateLimiter = Objects.requireNonNull(rateLimiter);
this.pullConcurrencyLimiter = pullConcurrencyLimiter;
this.pullBandRateLimiter = Objects.requireNonNull(pullBandRateLimiter);
this.scalablePushBandRateLimiter = Objects.requireNonNull(scalablePushBandRateLimiter);
this.routing = Objects.requireNonNull(routing);
this.pushRouting = pushRouting;
this.localCommands = Objects.requireNonNull(localCommands);
Expand All @@ -164,8 +167,8 @@ public CompletableFuture<QueryPublisher> createQueryPublisher(final String sql,
try {
return new QueryEndpoint(
ksqlEngine, ksqlConfig, ksqlRestConfig, routingFilterFactory, pullQueryMetrics,
rateLimiter, pullConcurrencyLimiter, pullBandRateLimiter, routing, pushRouting,
localCommands)
rateLimiter, pullConcurrencyLimiter, pullBandRateLimiter, scalablePushBandRateLimiter,
routing, pushRouting, localCommands)
.createQueryPublisher(
sql,
properties,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.analyzer.ImmutableAnalysis;
import io.confluent.ksql.api.server.SlidingWindowRateLimiter;
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.physical.scalablepush.PushRouting;
Expand Down Expand Up @@ -59,6 +60,7 @@ final class PushQueryPublisher implements Flow.Publisher<Collection<StreamedRow>
private final PushRouting pushRouting;
private final boolean isScalablePush;
private final Context context;
private final SlidingWindowRateLimiter scalablePushBandRateLimiter;

private PushQueryPublisher(
final KsqlEngine ksqlEngine,
Expand All @@ -75,6 +77,7 @@ private PushQueryPublisher(
this.pushRouting = null;
this.isScalablePush = false;
this.context = null;
this.scalablePushBandRateLimiter = null;
}

private PushQueryPublisher(
Expand All @@ -84,7 +87,8 @@ private PushQueryPublisher(
final ConfiguredStatement<Query> query,
final ImmutableAnalysis analysis,
final PushRouting pushRouting,
final Context context
final Context context,
final SlidingWindowRateLimiter scalablePushBandRateLimiter
) {
this.ksqlEngine = requireNonNull(ksqlEngine, "ksqlEngine");
this.serviceContext = requireNonNull(serviceContext, "serviceContext");
Expand All @@ -94,6 +98,8 @@ private PushQueryPublisher(
this.pushRouting = requireNonNull(pushRouting, "pushRouting");
this.isScalablePush = true;
this.context = requireNonNull(context, "context");
this.scalablePushBandRateLimiter =
requireNonNull(scalablePushBandRateLimiter, "scalablePushBandRateLimiter");
}

public static PushQueryPublisher createPublisher(
Expand All @@ -113,7 +119,8 @@ public static PushQueryPublisher createScalablePublisher(
final ConfiguredStatement<Query> query,
final ImmutableAnalysis analysis,
final PushRouting pushRouting,
final Context context
final Context context,
final SlidingWindowRateLimiter scalablePushBandRateLimiter
) {
return new PushQueryPublisher(
ksqlEngine,
Expand All @@ -122,7 +129,8 @@ public static PushQueryPublisher createScalablePublisher(
query,
analysis,
pushRouting,
context
context,
scalablePushBandRateLimiter
);
}

Expand All @@ -138,6 +146,8 @@ public synchronized void subscribe(final Flow.Subscriber<Collection<StreamedRow>
query.getSessionConfig().getConfig(false),
query.getSessionConfig().getOverrides());

scalablePushBandRateLimiter.allow();

final ImmutableAnalysis analysis =
ksqlEngine.analyzeQueryWithNoOutputTopic(query.getStatement(), query.getStatementText());
final ScalablePushQueryMetadata pushQueryMetadata = ksqlEngine
Expand Down
Loading

0 comments on commit d5af6a1

Please sign in to comment.