diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/KsqlExecutionContext.java b/ksqldb-engine/src/main/java/io/confluent/ksql/KsqlExecutionContext.java index 9c6271e194c4..fa4d190e423c 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/KsqlExecutionContext.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/KsqlExecutionContext.java @@ -40,6 +40,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ExecutorService; /** * The context in which statements can be executed. @@ -154,6 +155,7 @@ PullQueryResult executePullQuery( ConfiguredStatement statement, RoutingFilterFactory routingFilterFactory, RoutingOptions routingOptions, + ExecutorService executorService, Optional pullQueryMetrics ); diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/EngineExecutor.java b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/EngineExecutor.java index afd37e51a0a4..ae0aaeaf2d4b 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/EngineExecutor.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/EngineExecutor.java @@ -68,6 +68,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; /** @@ -142,6 +143,7 @@ PullQueryResult executePullQuery( final ConfiguredStatement statement, final RoutingFilterFactory routingFilterFactory, final RoutingOptions routingOptions, + final ExecutorService executorService, final Optional pullQueryMetrics ) { @@ -164,12 +166,11 @@ PullQueryResult executePullQuery( ksqlConfig, analysis, statement); - - try (HARouting routing = new HARouting( - ksqlConfig, physicalPlan, routingFilterFactory, routingOptions, statement, serviceContext, - physicalPlan.getOutputSchema(), physicalPlan.getQueryId(), pullQueryMetrics)) { - return routing.handlePullQuery(); - } + final HARouting routing = new HARouting( + physicalPlan, routingFilterFactory, routingOptions, statement, serviceContext, + physicalPlan.getOutputSchema(), physicalPlan.getQueryId(), executorService, + pullQueryMetrics); + return routing.handlePullQuery(); } catch (final Exception e) { pullQueryMetrics.ifPresent(metrics -> metrics.recordErrorRate(1)); throw new KsqlStatementException( diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/KsqlEngine.java b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/KsqlEngine.java index 867ca55fa0cc..1f5a30bc123f 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/KsqlEngine.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/KsqlEngine.java @@ -52,6 +52,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -268,6 +269,7 @@ public PullQueryResult executePullQuery( final ConfiguredStatement statement, final RoutingFilterFactory routingFilterFactory, final RoutingOptions routingOptions, + final ExecutorService executorService, final Optional pullQueryMetrics ) { return EngineExecutor @@ -280,6 +282,7 @@ public PullQueryResult executePullQuery( statement, routingFilterFactory, routingOptions, + executorService, pullQueryMetrics ); } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/SandboxedExecutionContext.java b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/SandboxedExecutionContext.java index efdca11113f6..d86c9bd509be 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/SandboxedExecutionContext.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/SandboxedExecutionContext.java @@ -40,6 +40,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ExecutorService; /** * An execution context that can execute statements without changing the core engine's state @@ -164,6 +165,7 @@ public PullQueryResult executePullQuery( final ConfiguredStatement statement, final RoutingFilterFactory routingFilterFactory, final RoutingOptions routingOptions, + final ExecutorService executorService, final Optional pullQueryMetrics ) { return EngineExecutor.create( @@ -174,6 +176,7 @@ public PullQueryResult executePullQuery( statement, routingFilterFactory, routingOptions, + executorService, pullQueryMetrics ); } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/pull/HARouting.java b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/pull/HARouting.java index b1af14949e76..526720b821a9 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/pull/HARouting.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/pull/HARouting.java @@ -31,7 +31,6 @@ import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.statement.ConfiguredStatement; -import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.KsqlRequestConfig; import io.confluent.ksql.util.KsqlServerException; @@ -47,7 +46,6 @@ import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.stream.Collectors; import org.slf4j.Logger; @@ -68,7 +66,6 @@ public final class HARouting implements AutoCloseable { private final RouteQuery routeQuery; public HARouting( - final KsqlConfig ksqlConfig, final PullPhysicalPlan pullPhysicalPlan, final RoutingFilterFactory routingFilterFactory, final RoutingOptions routingOptions, @@ -76,15 +73,15 @@ public HARouting( final ServiceContext serviceContext, final LogicalSchema outputSchema, final QueryId queryId, + final ExecutorService executorService, final Optional pullQueryMetrics ) { - this(ksqlConfig, pullPhysicalPlan, routingFilterFactory, routingOptions, statement, - serviceContext, outputSchema, queryId, pullQueryMetrics, HARouting::executeOrRouteQuery); + this(pullPhysicalPlan, routingFilterFactory, routingOptions, statement, serviceContext, + outputSchema, queryId, executorService, pullQueryMetrics, HARouting::executeOrRouteQuery); } @VisibleForTesting HARouting( - final KsqlConfig ksqlConfig, final PullPhysicalPlan pullPhysicalPlan, final RoutingFilterFactory routingFilterFactory, final RoutingOptions routingOptions, @@ -92,6 +89,7 @@ public HARouting( final ServiceContext serviceContext, final LogicalSchema outputSchema, final QueryId queryId, + final ExecutorService executorService, final Optional pullQueryMetrics, final RouteQuery routeQuery ) { @@ -103,11 +101,9 @@ public HARouting( this.serviceContext = Objects.requireNonNull(serviceContext, "serviceContext"); this.outputSchema = Objects.requireNonNull(outputSchema, "outputSchema"); this.queryId = Objects.requireNonNull(queryId, "queryId"); + this.executorService = Objects.requireNonNull(executorService, "executorService"); this.pullQueryMetrics = Objects.requireNonNull(pullQueryMetrics, "pullQueryMetrics"); - this.executorService = Executors.newFixedThreadPool( - ksqlConfig.getInt(KsqlConfig.KSQL_QUERY_PULL_THREAD_POOL_SIZE_CONFIG) - ); - this.routeQuery = Objects.requireNonNull(routeQuery, "routeQuery"); + this.routeQuery = routeQuery; } public PullQueryResult handlePullQuery() throws InterruptedException { diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/physical/pull/HARoutingTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/physical/pull/HARoutingTest.java index 6c148d2da891..7d2146bbd33b 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/physical/pull/HARoutingTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/physical/pull/HARoutingTest.java @@ -39,10 +39,10 @@ import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.statement.ConfiguredStatement; -import io.confluent.ksql.util.KsqlConfig; import java.util.ArrayList; import java.util.List; import java.util.Optional; +import java.util.concurrent.Executors; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -82,8 +82,6 @@ public class HARoutingTest { @Mock private RoutingFilterFactory routingFilterFactory; @Mock - private KsqlConfig ksqlConfig; - @Mock private PullPhysicalPlan pullPhysicalPlan; @Mock private Materialization materialization; @@ -101,12 +99,10 @@ public void setUp() { when(location2.getNodes()).thenReturn(ImmutableList.of(node2, node1)); when(location3.getNodes()).thenReturn(ImmutableList.of(node1, node2)); when(location4.getNodes()).thenReturn(ImmutableList.of(node2, node1)); - when(ksqlConfig.getInt(KsqlConfig.KSQL_QUERY_PULL_THREAD_POOL_SIZE_CONFIG)) - .thenReturn(1); haRouting = new HARouting( - ksqlConfig, pullPhysicalPlan, routingFilterFactory, routingOptions, statement, - serviceContext, logicalSchema, queryId, Optional.empty(), routeQuery); + pullPhysicalPlan, routingFilterFactory, routingOptions, statement, serviceContext, + logicalSchema, queryId, Executors.newFixedThreadPool(1), Optional.empty(), routeQuery); } diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/QueryEndpoint.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/QueryEndpoint.java index d36d336522d3..0c40c3b05766 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/QueryEndpoint.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/QueryEndpoint.java @@ -49,6 +49,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; import org.apache.kafka.common.utils.Time; @@ -59,19 +60,22 @@ public class QueryEndpoint { private final RoutingFilterFactory routingFilterFactory; private final Optional pullQueryMetrics; private final RateLimiter rateLimiter; + private final ExecutorService pullExecutorService; public QueryEndpoint( final KsqlEngine ksqlEngine, final KsqlConfig ksqlConfig, final RoutingFilterFactory routingFilterFactory, final Optional pullQueryMetrics, - final RateLimiter rateLimiter + final RateLimiter rateLimiter, + final ExecutorService pullExecutorService ) { this.ksqlEngine = ksqlEngine; this.ksqlConfig = ksqlConfig; this.routingFilterFactory = routingFilterFactory; this.pullQueryMetrics = pullQueryMetrics; this.rateLimiter = rateLimiter; + this.pullExecutorService = pullExecutorService; } public QueryPublisher createQueryPublisher( @@ -133,6 +137,7 @@ private QueryPublisher createPullQueryPublisher( statement, routingFilterFactory, routingOptions, + pullExecutorService, pullQueryMetrics ); diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java index ae82f9e1cab2..6efcd15cb8d8 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java @@ -128,6 +128,7 @@ import java.util.Properties; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -188,6 +189,7 @@ public final class KsqlRestApplication implements Executable { private final RoutingFilterFactory routingFilterFactory; private final Optional pullQueryMetrics; private final RateLimiter pullQueryRateLimiter; + private final ExecutorService pullExecutorService; // The startup thread that can be interrupted if necessary during shutdown. This should only // happen if startup hangs. @@ -226,7 +228,8 @@ public static SourceName getCommandsStreamName() { final DenyListPropertyValidator denyListPropertyValidator, final Optional pullQueryMetrics, final RoutingFilterFactory routingFilterFactory, - final RateLimiter pullQueryRateLimiter + final RateLimiter pullQueryRateLimiter, + final ExecutorService pullExecutorService ) { log.debug("Creating instance of ksqlDB API server"); @@ -283,6 +286,7 @@ public static SourceName getCommandsStreamName() { log.debug("ksqlDB API server instance created"); this.routingFilterFactory = requireNonNull(routingFilterFactory, "routingFilterFactory"); this.pullQueryRateLimiter = requireNonNull(pullQueryRateLimiter, "pullQueryRateLimiter"); + this.pullExecutorService = requireNonNull(pullExecutorService, "pullExecutorService"); } @Override @@ -323,7 +327,8 @@ public void startAsync() { denyListPropertyValidator, pullQueryMetrics, routingFilterFactory, - pullQueryRateLimiter + pullQueryRateLimiter, + pullExecutorService ); startAsyncThreadRef.set(Thread.currentThread()); @@ -344,7 +349,8 @@ public void startAsync() { serverMetadataResource, wsQueryEndpoint, pullQueryMetrics, - pullQueryRateLimiter + pullQueryRateLimiter, + pullExecutorService ); apiServer = new Server(vertx, ksqlRestConfig, endpoints, securityExtension, authenticationPlugin, serverState, pullQueryMetrics); @@ -486,6 +492,14 @@ public void shutdown() { log.error("Exception while waiting for pull query metrics to close", e); } + try { + pullExecutorService.shutdown(); + pullExecutorService.awaitTermination( + Duration.ofSeconds(10).toMillis(), TimeUnit.MILLISECONDS); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + } + try { ksqlEngine.close(); } catch (final Exception e) { @@ -715,6 +729,9 @@ static KsqlRestApplication buildApplication( heartbeatAgent, lagReportingAgent); final RateLimiter pullQueryRateLimiter = RateLimiter.create( ksqlConfig.getInt(KsqlConfig.KSQL_QUERY_PULL_MAX_QPS_CONFIG)); + final ExecutorService pullExecutorService = Executors.newFixedThreadPool( + ksqlConfig.getInt(KsqlConfig.KSQL_QUERY_PULL_THREAD_POOL_SIZE_CONFIG) + ); final DenyListPropertyValidator denyListPropertyValidator = new DenyListPropertyValidator( ksqlConfig.getList(KsqlConfig.KSQL_PROPERTIES_OVERRIDES_DENYLIST)); @@ -739,7 +756,8 @@ static KsqlRestApplication buildApplication( denyListPropertyValidator, pullQueryMetrics, routingFilterFactory, - pullQueryRateLimiter + pullQueryRateLimiter, + pullExecutorService ); final List managedTopics = new LinkedList<>(); @@ -816,7 +834,8 @@ static KsqlRestApplication buildApplication( denyListPropertyValidator, pullQueryMetrics, routingFilterFactory, - pullQueryRateLimiter + pullQueryRateLimiter, + pullExecutorService ); } diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerEndpoints.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerEndpoints.java index 68cb19e8eccd..2726599df3d2 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerEndpoints.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerEndpoints.java @@ -57,6 +57,7 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; import java.util.function.Function; import java.util.function.Supplier; import org.reactivestreams.Subscriber; @@ -82,6 +83,7 @@ public class KsqlServerEndpoints implements Endpoints { private final WSQueryEndpoint wsQueryEndpoint; private final Optional pullQueryMetrics; private final RateLimiter rateLimiter; + private final ExecutorService pullExecutorService; // CHECKSTYLE_RULES.OFF: ParameterNumber public KsqlServerEndpoints( @@ -100,7 +102,8 @@ public KsqlServerEndpoints( final ServerMetadataResource serverMetadataResource, final WSQueryEndpoint wsQueryEndpoint, final Optional pullQueryMetrics, - final RateLimiter rateLimiter + final RateLimiter rateLimiter, + final ExecutorService pullExecutorService ) { // CHECKSTYLE_RULES.ON: ParameterNumber @@ -121,6 +124,7 @@ public KsqlServerEndpoints( this.wsQueryEndpoint = Objects.requireNonNull(wsQueryEndpoint); this.pullQueryMetrics = Objects.requireNonNull(pullQueryMetrics); this.rateLimiter = Objects.requireNonNull(rateLimiter); + this.pullExecutorService = Objects.requireNonNull(pullExecutorService, "pullExecutorService"); } @Override @@ -134,7 +138,8 @@ public CompletableFuture createQueryPublisher(final String sql, return executeOnWorker(() -> { try { return new QueryEndpoint( - ksqlEngine, ksqlConfig, routingFilterFactory, pullQueryMetrics, rateLimiter) + ksqlEngine, ksqlConfig, routingFilterFactory, pullQueryMetrics, + rateLimiter, pullExecutorService) .createQueryPublisher( sql, properties, diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisher.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisher.java index 799281dad46a..3d6bf6ba200d 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisher.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisher.java @@ -41,6 +41,7 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -53,6 +54,7 @@ class PullQueryPublisher implements Flow.Publisher> { private final long startTimeNanos; private final RoutingFilterFactory routingFilterFactory; private final RateLimiter rateLimiter; + private final ExecutorService pullExecutorService; @VisibleForTesting PullQueryPublisher( @@ -62,7 +64,8 @@ class PullQueryPublisher implements Flow.Publisher> { final Optional pullQueryMetrics, final long startTimeNanos, final RoutingFilterFactory routingFilterFactory, - final RateLimiter rateLimiter + final RateLimiter rateLimiter, + final ExecutorService pullExecutorService ) { this.ksqlEngine = requireNonNull(ksqlEngine, "ksqlEngine"); this.serviceContext = requireNonNull(serviceContext, "serviceContext"); @@ -71,6 +74,7 @@ class PullQueryPublisher implements Flow.Publisher> { this.startTimeNanos = startTimeNanos; this.routingFilterFactory = requireNonNull(routingFilterFactory, "routingFilterFactory"); this.rateLimiter = requireNonNull(rateLimiter, "rateLimiter"); + this.pullExecutorService = requireNonNull(pullExecutorService, "pullExecutorService"); } @Override @@ -91,6 +95,7 @@ public synchronized void subscribe(final Subscriber> sub query, routingFilterFactory, routingOptions, + pullExecutorService, pullQueryMetrics ); diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java index a8c7d24776f8..bac570f324d3 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java @@ -65,6 +65,7 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.kafka.common.errors.TopicAuthorizationException; @@ -89,8 +90,9 @@ public class StreamedQueryResource implements KsqlConfigurable { private final DenyListPropertyValidator denyListPropertyValidator; private final Optional pullQueryMetrics; private final RoutingFilterFactory routingFilterFactory; + private final RateLimiter rateLimiter; private KsqlConfig ksqlConfig; - private RateLimiter rateLimiter; + private final ExecutorService pullExecutorService; @SuppressWarnings("checkstyle:ParameterNumber") public StreamedQueryResource( @@ -104,7 +106,8 @@ public StreamedQueryResource( final DenyListPropertyValidator denyListPropertyValidator, final Optional pullQueryMetrics, final RoutingFilterFactory routingFilterFactory, - final RateLimiter rateLimiter + final RateLimiter rateLimiter, + final ExecutorService pullExecutorService ) { this( ksqlEngine, @@ -118,7 +121,8 @@ public StreamedQueryResource( denyListPropertyValidator, pullQueryMetrics, routingFilterFactory, - rateLimiter + rateLimiter, + pullExecutorService ); } @@ -137,7 +141,8 @@ public StreamedQueryResource( final DenyListPropertyValidator denyListPropertyValidator, final Optional pullQueryMetrics, final RoutingFilterFactory routingFilterFactory, - final RateLimiter rateLimiter + final RateLimiter rateLimiter, + final ExecutorService pullExecutorService ) { this.ksqlEngine = Objects.requireNonNull(ksqlEngine, "ksqlEngine"); this.statementParser = Objects.requireNonNull(statementParser, "statementParser"); @@ -156,6 +161,7 @@ public StreamedQueryResource( this.routingFilterFactory = Objects.requireNonNull(routingFilterFactory, "routingFilterFactory"); this.rateLimiter = Objects.requireNonNull(rateLimiter, "rateLimiter"); + this.pullExecutorService = Objects.requireNonNull(pullExecutorService, "pullExecutorService"); } @Override @@ -313,6 +319,7 @@ private EndpointResponse handlePullQuery( configured, routingFilterFactory, routingOptions, + pullExecutorService, pullQueryMetrics ); final TableRows tableRows = new TableRows( diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java index 2ab4c153bfb9..f2a197982616 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java @@ -50,6 +50,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeoutException; import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.utils.Time; @@ -76,6 +77,7 @@ public class WSQueryEndpoint { private final Optional pullQueryMetrics; private final RoutingFilterFactory routingFilterFactory; private final RateLimiter rateLimiter; + private final ExecutorService pullExecutorService; private WebSocketSubscriber subscriber; @@ -94,7 +96,8 @@ public WSQueryEndpoint( final DenyListPropertyValidator denyListPropertyValidator, final Optional pullQueryMetrics, final RoutingFilterFactory routingFilterFactory, - final RateLimiter rateLimiter + final RateLimiter rateLimiter, + final ExecutorService pullExecutorService ) { this( ksqlConfig, @@ -112,7 +115,8 @@ public WSQueryEndpoint( denyListPropertyValidator, pullQueryMetrics, routingFilterFactory, - rateLimiter + rateLimiter, + pullExecutorService ); } @@ -134,7 +138,8 @@ public WSQueryEndpoint( final DenyListPropertyValidator denyListPropertyValidator, final Optional pullQueryMetrics, final RoutingFilterFactory routingFilterFactory, - final RateLimiter rateLimiter + final RateLimiter rateLimiter, + final ExecutorService pullExecutorService ) { this.ksqlConfig = Objects.requireNonNull(ksqlConfig, "ksqlConfig"); this.statementParser = Objects.requireNonNull(statementParser, "statementParser"); @@ -158,6 +163,7 @@ public WSQueryEndpoint( this.routingFilterFactory = Objects.requireNonNull( routingFilterFactory, "routingFilterFactory"); this.rateLimiter = Objects.requireNonNull(rateLimiter, "rateLimiter"); + this.pullExecutorService = Objects.requireNonNull(pullExecutorService, "pullExecutorService"); } public void executeStreamQuery(final ServerWebSocket webSocket, final MultiMap requestParams, @@ -291,7 +297,8 @@ private void handleQuery(final RequestContext info, final Query query, pullQueryMetrics, startTimeNanos, routingFilterFactory, - rateLimiter + rateLimiter, + pullExecutorService ); } else { pushQueryPublisher.start( @@ -355,7 +362,8 @@ private static void startPullQueryPublisher( final Optional pullQueryMetrics, final long startTimeNanos, final RoutingFilterFactory routingFilterFactory, - final RateLimiter rateLimiter + final RateLimiter rateLimiter, + final ExecutorService pullExecutorService ) { new PullQueryPublisher( ksqlEngine, @@ -364,7 +372,8 @@ private static void startPullQueryPublisher( pullQueryMetrics, startTimeNanos, routingFilterFactory, - rateLimiter + rateLimiter, + pullExecutorService ).subscribe(streamSubscriber); } @@ -401,7 +410,8 @@ void start( Optional pullQueryMetrics, long startTimeNanos, RoutingFilterFactory routingFilterFactory, - RateLimiter rateLimiter); + RateLimiter rateLimiter, + ExecutorService pullExecutorService); } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java index 490c35e795c9..42def51e36cf 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java @@ -69,6 +69,7 @@ import java.util.Map; import java.util.Optional; import java.util.Queue; +import java.util.concurrent.ExecutorService; import java.util.function.Consumer; import org.apache.kafka.common.metrics.MetricsReporter; import org.apache.kafka.streams.StreamsConfig; @@ -143,6 +144,8 @@ public class KsqlRestApplicationTest { private RoutingFilterFactory routingFilterFactory; @Mock private RateLimiter rateLimiter; + @Mock + private ExecutorService pullExecutorService; @Mock private Vertx vertx; @@ -503,7 +506,8 @@ private void givenAppWithRestConfig(final Map restConfigMap) { denyListPropertyValidator, Optional.empty(), routingFilterFactory, - rateLimiter + rateLimiter, + pullExecutorService ); } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/WSQueryEndpointTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/WSQueryEndpointTest.java index 3a9b1b5f77d2..9efd7a68261d 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/WSQueryEndpointTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/WSQueryEndpointTest.java @@ -41,6 +41,7 @@ import java.util.Collections; import java.util.Map; import java.util.Optional; +import java.util.concurrent.ExecutorService; import org.apache.kafka.streams.StreamsConfig; import org.junit.Before; import org.junit.Test; @@ -78,7 +79,8 @@ public void setUp() { denyListPropertyValidator, Optional.empty(), mock(RoutingFilterFactory.class), - mock(RateLimiter.class) + mock(RateLimiter.class), + mock(ExecutorService.class) ); } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisherTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisherTest.java index f016f7a74ee0..0811661dbaab 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisherTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisherTest.java @@ -45,6 +45,8 @@ import java.util.Collection; import java.util.List; import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -88,6 +90,9 @@ public class PullQueryPublisherTest { private SessionConfig sessionConfig; @Mock private KsqlConfig ksqlConfig; + @Mock + private ExecutorService executorService; + @Captor private ArgumentCaptor subscriptionCaptor; @@ -103,7 +108,9 @@ public void setUp() { Optional.empty(), TIME_NANOS, routingFilterFactory, - create(1)); + create(1), + executorService); + when(statement.getStatementText()).thenReturn(""); when(statement.getSessionConfig()).thenReturn(sessionConfig); @@ -113,7 +120,7 @@ public void setUp() { when(pullQueryResult.getSchema()).thenReturn(PULL_SCHEMA); when(pullQueryResult.getTableRows()).thenReturn(tableRows); when(pullQueryResult.getSourceNodes()).thenReturn(Optional.empty()); - when(engine.executePullQuery(any(), any(), any(), any(), any())) + when(engine.executePullQuery(any(), any(), any(), any(), any(), any())) .thenReturn(pullQueryResult); doAnswer(callRequestAgain()).when(subscriber).onNext(any()); @@ -137,7 +144,8 @@ public void shouldRunQueryWithCorrectParams() { subscription.request(1); // Then: - verify(engine).executePullQuery(eq(serviceContext), eq(statement), eq(routingFilterFactory), any(), eq(Optional.empty())); + verify(engine).executePullQuery( + eq(serviceContext), eq(statement), eq(routingFilterFactory), any(), eq(executorService), eq(Optional.empty())); } @Test @@ -150,7 +158,8 @@ public void shouldOnlyExecuteOnce() { // Then: verify(subscriber).onNext(any()); - verify(engine).executePullQuery(eq(serviceContext), eq(statement), eq(routingFilterFactory), any(), eq(Optional.empty())); + verify(engine).executePullQuery( + eq(serviceContext), eq(statement), eq(routingFilterFactory), any(), eq(executorService), eq(Optional.empty())); } @Test @@ -185,7 +194,7 @@ public void shouldCallOnErrorOnFailure() { // Given: givenSubscribed(); final Throwable e = new RuntimeException("Boom!"); - when(engine.executePullQuery(any(), any(), any(), any(), any())).thenThrow(e); + when(engine.executePullQuery(any(), any(), any(), any(), any(), any())).thenThrow(e); // When: subscription.request(1); diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java index d885bfa970b4..ffdb5dc0eb1b 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java @@ -109,6 +109,7 @@ import java.util.Optional; import java.util.Scanner; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -220,7 +221,8 @@ public void setup() { denyListPropertyValidator, Optional.empty(), routingFilterFactory, - rateLimiter + rateLimiter, + Executors.newFixedThreadPool(1) ); testResource.configure(VALID_CONFIG); @@ -301,10 +303,11 @@ public void shouldRateLimit() { denyListPropertyValidator, Optional.empty(), routingFilterFactory, - pullQueryRateLimiter + pullQueryRateLimiter, + Executors.newFixedThreadPool(1) ); testResource.configure(VALID_CONFIG); - when(mockKsqlEngine.executePullQuery(any(), any(), any(), any(), any())).thenReturn(pullQueryResult); + when(mockKsqlEngine.executePullQuery(any(), any(), any(), any(), any(), any())).thenReturn(pullQueryResult); when(pullQueryResult.getTableRows()).thenReturn(Collections.emptyList()); when(pullQueryResult.getSchema()).thenReturn(schema); when(pullQueryResult.getQueryId()).thenReturn(queryId); @@ -346,7 +349,8 @@ public void shouldThrowOnHandleStatementIfNotConfigured() { denyListPropertyValidator, Optional.empty(), routingFilterFactory, - rateLimiter + rateLimiter, + Executors.newFixedThreadPool(1) ); // When: @@ -509,7 +513,8 @@ public void shouldThrowOnDenyListedStreamProperty() { denyListPropertyValidator, Optional.empty(), routingFilterFactory, - rateLimiter + rateLimiter, + Executors.newFixedThreadPool(1) ); final Map props = new HashMap<>(ImmutableMap.of( StreamsConfig.APPLICATION_SERVER_CONFIG, "something:1"