Skip to content

Commit

Permalink
query limits
Browse files Browse the repository at this point in the history
  • Loading branch information
Tim Fox committed Feb 18, 2020
1 parent 0cd9a95 commit 7638743
Show file tree
Hide file tree
Showing 20 changed files with 346 additions and 198 deletions.
45 changes: 5 additions & 40 deletions ksql-api/src/main/java/io/confluent/ksql/api/impl/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.impl.VertxThread;

/**
* General purpose utils (not limited to the server, could be used by client too) for the API
Expand Down Expand Up @@ -49,53 +48,19 @@ public static <T> void connectPromise(final Future<T> future, final Promise<T> p
}

public static void checkIsWorker() {
checkThread(true);
}

public static void checkIsNotWorker() {
checkThread(false);
}

public static boolean isEventLoopThread() {
return isWorkerThread(false);
}

public static boolean isWorkerThread() {
return isWorkerThread(true);
}

private static boolean isWorkerThread(final boolean worker) {
final Thread thread = Thread.currentThread();
if (!(thread instanceof VertxThread)) {
throw new IllegalStateException("Not a Vert.x thread " + thread);
}
final VertxThread vertxThread = (VertxThread) thread;
return vertxThread.isWorker() == worker;
}

private static void checkThread(final boolean worker) {
if (!isWorkerThread(worker)) {
throw new IllegalStateException("Not a " + (worker ? "worker" : "event loop") + " thread");
if (!Context.isOnWorkerThread()) {
throw new IllegalStateException("Not a worker thread");
}
}

public static void checkContext(final Context context) {
checkIsNotWorker();
if (context != Vertx.currentContext()) {
throw new IllegalStateException("On wrong context");
if (!isEventLoopAndSameContext(context)) {
throw new IllegalStateException("On wrong context or worker");
}
}

public static boolean isEventLoopAndSameContext(final Context context) {
final Thread thread = Thread.currentThread();
if (!(thread instanceof VertxThread)) {
return false;
}
final VertxThread vertxThread = (VertxThread) thread;
if (vertxThread.isWorker()) {
return false;
}
return context == Vertx.currentContext();
return Context.isOnEventLoopThread() && context == Vertx.currentContext();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ public int queueSize() {
return queue.size();
}

@Override
public boolean isPullQuery() {
return false;
}

@Override
protected void maybeSend() {
ctx.runOnContext(v -> doSend());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public KsqlServerEndpoints(
this.reservedInternalTopics = new ReservedInternalTopics(ksqlConfig);
}

@Override
public QueryPublisher createQueryPublisher(
final String sql, final JsonObject properties,
final Context context,
Expand All @@ -105,6 +106,7 @@ public QueryPublisher createQueryPublisher(
private QueryPublisher createPushQueryPublisher(final Context context,
final ServiceContext serviceContext,
final ConfiguredStatement<Query> statement, final WorkerExecutor workerExecutor) {

final BlockingQueryPublisher publisher = new BlockingQueryPublisher(context,
workerExecutor);
final QueryMetadata queryMetadata = ksqlEngine
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,9 @@ public List<String> getColumnNames() {
public List<String> getColumnTypes() {
return columnTypes;
}

@Override
public boolean isPullQuery() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ public class ApiServerConfig extends AbstractConfig {
"Max number of worker threads for executing blocking code";
public static final int DEFAULT_WORKER_POOL_SIZE = 100;

public static final String MAX_PUSH_QUERIES = propertyName("max.push.queries");
public static final int DEFAULT_MAX_PUSH_QUERIES = 100;
public static final String MAX_PUSH_QUERIES_DOC =
"The maximum number of push queries allowed on the server at any one time";

private static String propertyName(final String name) {
return KsqlConfig.KSQL_CONFIG_PROPERTY_PREFIX + PROPERTY_PREFIX + name;
}
Expand Down Expand Up @@ -141,7 +146,13 @@ private static String propertyName(final String name) {
Type.INT,
DEFAULT_WORKER_POOL_SIZE,
Importance.MEDIUM,
WORKER_POOL_DOC);
WORKER_POOL_DOC)
.define(
MAX_PUSH_QUERIES,
Type.INT,
DEFAULT_MAX_PUSH_QUERIES,
Importance.MEDIUM,
MAX_PUSH_QUERIES_DOC);

public ApiServerConfig(final Map<?, ?> map) {
super(CONFIG_DEF, map);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import io.confluent.ksql.api.impl.Utils;
import io.vertx.core.Context;
import io.vertx.core.Vertx;
import java.util.Objects;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
Expand Down Expand Up @@ -51,7 +50,7 @@ public BasePublisher(final Context ctx) {
@Override
public void subscribe(final Subscriber<? super T> subscriber) {
Objects.requireNonNull(subscriber);
if (Vertx.currentContext() == ctx && !Utils.isWorkerThread()) {
if (Utils.isEventLoopAndSameContext(ctx)) {
doSubscribe(subscriber);
} else {
ctx.runOnContext(v -> doSubscribe(subscriber));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public PushQueryHolder createApiQuery(
final ConnectionQueries connectionQueries = getConnectionQueries(request);
final PushQueryHolder query = new PushQueryHolder(server,
queryPublisher, connectionQueries::removeQuery);
server.registerQuery(query);
connectionQueries.addQuery(query);
return query;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ private ErrorCodes() {
public static final int ERROR_CODE_INVALID_QUERY = 5;
public static final int ERROR_CODE_MISSING_KEY_FIELD = 6;
public static final int ERROR_CODE_CANNOT_COERCE_FIELD = 7;
public static final int ERROR_MAX_PUSH_QUERIES_EXCEEDED = 8;


public static final int ERROR_CODE_INTERNAL_ERROR = 100;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ private class RequestHandler {
}

public void handleBodyBuffer(final Buffer buff) {

if (responseEnded) {
// Ignore further buffers from request if response has been written (most probably due
// to error)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ public class PushQueryHolder {
this.queryPublisher = Objects.requireNonNull(queryPublisher);
this.closeHandler = Objects.requireNonNull(closeHandler);
this.id = new PushQueryId(UUID.randomUUID().toString());
server.registerQuery(this);
}

public void close() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import io.confluent.ksql.util.KsqlStatementException;
import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.WorkerExecutor;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.RoutingContext;
import java.util.Objects;
Expand All @@ -45,16 +44,16 @@ public class QueryStreamHandler implements Handler<RoutingContext> {
private final Endpoints endpoints;
private final ConnectionQueryManager connectionQueryManager;
private final Context context;
private final WorkerExecutor workerExecutor;
private final Server server;

public QueryStreamHandler(final Endpoints endpoints,
final ConnectionQueryManager connectionQueryManager,
final Context context,
final WorkerExecutor workerExecutor) {
final Server server) {
this.endpoints = Objects.requireNonNull(endpoints);
this.connectionQueryManager = Objects.requireNonNull(connectionQueryManager);
this.context = Objects.requireNonNull(context);
this.workerExecutor = Objects.requireNonNull(workerExecutor);
this.server = Objects.requireNonNull(server);
}

@Override
Expand All @@ -80,10 +79,11 @@ public void handle(final RoutingContext routingContext) {
createQueryPublisherAsync(queryStreamArgs.get().sql, queryStreamArgs.get().properties, context)
.thenAccept(queryPublisher -> {

final PushQueryHolder query = connectionQueryManager
.createApiQuery(queryPublisher, routingContext.request());
final PushQueryHolder query = queryPublisher.isPullQuery() ? null :
connectionQueryManager.createApiQuery(queryPublisher, routingContext.request());

final QueryResponseMetadata metadata = new QueryResponseMetadata(query.getId().toString(),
final QueryResponseMetadata metadata = new QueryResponseMetadata(
query == null ? null : query.getId().toString(),
queryPublisher.getColumnNames(),
queryPublisher.getColumnTypes());

Expand All @@ -95,8 +95,10 @@ public void handle(final RoutingContext routingContext) {

queryPublisher.subscribe(querySubscriber);

// When response is complete, publisher should be closed and query unregistered
routingContext.response().endHandler(v -> query.close());
if (query != null) {
// When response is complete, publisher should be closed and query unregistered
routingContext.response().endHandler(v -> query.close());
}
})
.exceptionally(t -> handleQueryPublisherException(t, routingContext));
}
Expand All @@ -110,6 +112,10 @@ private Void handleQueryPublisherException(final Throwable t,
ServerUtils.handleError(routingContext.response(), 400, ErrorCodes.ERROR_CODE_INVALID_QUERY,
actual.getMessage());
return null;
} else if (actual instanceof KsqlApiException) {
ServerUtils
.handleError(routingContext.response(), 400, ((KsqlApiException) actual).getErrorCode(),
actual.getMessage());
}
}
// We don't expose internal error message via public API
Expand All @@ -124,9 +130,9 @@ private Void handleQueryPublisherException(final Throwable t,
private CompletableFuture<QueryPublisher> createQueryPublisherAsync(final String sql,
final JsonObject properties, final Context context) {
final VertxCompletableFuture<QueryPublisher> vcf = new VertxCompletableFuture<>();
workerExecutor.executeBlocking(
server.getWorkerExecutor().executeBlocking(
p -> p.complete(
endpoints.createQueryPublisher(sql, properties, context, workerExecutor)),
endpoints.createQueryPublisher(sql, properties, context, server.getWorkerExecutor())),
false,
vcf);
return vcf;
Expand Down
15 changes: 12 additions & 3 deletions ksql-api/src/main/java/io/confluent/ksql/api/server/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,15 @@ public class Server {
private final Endpoints endpoints;
private final Map<PushQueryId, PushQueryHolder> queries = new ConcurrentHashMap<>();
private final Set<HttpConnection> connections = new ConcurrentHashSet<>();
private final int maxPushQueryCount;
private String deploymentID;
private WorkerExecutor workerExecutor;

public Server(final Vertx vertx, final ApiServerConfig config, final Endpoints endpoints) {
this.vertx = Objects.requireNonNull(vertx);
this.config = Objects.requireNonNull(config);
this.endpoints = Objects.requireNonNull(endpoints);
this.maxPushQueryCount = config.getInt(ApiServerConfig.MAX_PUSH_QUERIES);
}

public synchronized void start() {
Expand All @@ -69,9 +71,8 @@ public synchronized void start() {
config.getInt(ApiServerConfig.WORKER_POOL_SIZE));
log.debug("Deploying " + options.getInstances() + " instances of server verticle");
final VertxCompletableFuture<String> future = new VertxCompletableFuture<>();
final WorkerExecutor theWorkerExecutor = workerExecutor;
vertx.deployVerticle(() ->
new ServerVerticle(endpoints, createHttpServerOptions(config), this, theWorkerExecutor),
new ServerVerticle(endpoints, createHttpServerOptions(config), this),
options,
future);
try {
Expand Down Expand Up @@ -99,8 +100,16 @@ public synchronized void stop() {
log.info("API server stopped");
}

void registerQuery(final PushQueryHolder query) {
public WorkerExecutor getWorkerExecutor() {
return workerExecutor;
}

synchronized void registerQuery(final PushQueryHolder query) {
Objects.requireNonNull(query);
if (queries.size() == maxPushQueryCount) {
throw new KsqlApiException("Maximum number of push queries exceeded",
ErrorCodes.ERROR_MAX_PUSH_QUERIES_EXCEEDED);
}
if (queries.putIfAbsent(query.getId(), query) != null) {
// It should never happen
// https://stackoverflow.com/questions/2513573/how-good-is-javas-uuid-randomuuid
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.WorkerExecutor;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
Expand All @@ -40,16 +39,14 @@ public class ServerVerticle extends AbstractVerticle {
private final Endpoints endpoints;
private final HttpServerOptions httpServerOptions;
private final Server server;
private final WorkerExecutor workerExecutor;
private ConnectionQueryManager connectionQueryManager;
private HttpServer httpServer;

public ServerVerticle(final Endpoints endpoints, final HttpServerOptions httpServerOptions,
final Server server, final WorkerExecutor workerExecutor) {
final Server server) {
this.endpoints = endpoints;
this.httpServerOptions = httpServerOptions;
this.server = server;
this.workerExecutor = workerExecutor;
}

@Override
Expand Down Expand Up @@ -79,11 +76,11 @@ private Router setupRouter() {
.produces("application/json")
.handler(BodyHandler.create())
.handler(new QueryStreamHandler(endpoints, connectionQueryManager, context,
workerExecutor));
server));
router.route(HttpMethod.POST, "/inserts-stream")
.produces("application/vnd.ksqlapi.delimited.v1")
.produces("application/json")
.handler(new InsertsStreamHandler(context, endpoints, workerExecutor));
.handler(new InsertsStreamHandler(context, endpoints, server.getWorkerExecutor()));
router.route(HttpMethod.POST, "/close-query").handler(BodyHandler.create())
.handler(new CloseQueryHandler(server));
return router;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class QueryResponseMetadata extends SerializableObject {

public QueryResponseMetadata(final String queryId, final List<String> columnNames,
final List<String> columnTypes) {
this.queryId = Objects.requireNonNull(queryId);
this.queryId = queryId;
this.columnNames = Objects.requireNonNull(columnNames);
this.columnTypes = Objects.requireNonNull(columnTypes);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,9 @@ public interface QueryPublisher extends Publisher<GenericRow> {
*/
void close();

/**
* @return true if pull query
*/
boolean isPullQuery();

}
Loading

0 comments on commit 7638743

Please sign in to comment.