Skip to content

Commit

Permalink
fix: creating threads per request
Browse files Browse the repository at this point in the history
  • Loading branch information
vpapavas committed Nov 24, 2020
1 parent 1e19204 commit 8d2ed9c
Show file tree
Hide file tree
Showing 16 changed files with 127 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;

/**
* The context in which statements can be executed.
Expand Down Expand Up @@ -154,6 +155,7 @@ PullQueryResult executePullQuery(
ConfiguredStatement<Query> statement,
RoutingFilterFactory routingFilterFactory,
RoutingOptions routingOptions,
ExecutorService executorService,
Optional<PullQueryExecutorMetrics> pullQueryMetrics
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -142,6 +143,7 @@ PullQueryResult executePullQuery(
final ConfiguredStatement<Query> statement,
final RoutingFilterFactory routingFilterFactory,
final RoutingOptions routingOptions,
final ExecutorService executorService,
final Optional<PullQueryExecutorMetrics> pullQueryMetrics
) {

Expand All @@ -164,12 +166,11 @@ PullQueryResult executePullQuery(
ksqlConfig,
analysis,
statement);

try (HARouting routing = new HARouting(
ksqlConfig, physicalPlan, routingFilterFactory, routingOptions, statement, serviceContext,
physicalPlan.getOutputSchema(), physicalPlan.getQueryId(), pullQueryMetrics)) {
return routing.handlePullQuery();
}
final HARouting routing = new HARouting(
physicalPlan, routingFilterFactory, routingOptions, statement, serviceContext,
physicalPlan.getOutputSchema(), physicalPlan.getQueryId(), executorService,
pullQueryMetrics);
return routing.handlePullQuery();
} catch (final Exception e) {
pullQueryMetrics.ifPresent(metrics -> metrics.recordErrorRate(1));
throw new KsqlStatementException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -268,6 +269,7 @@ public PullQueryResult executePullQuery(
final ConfiguredStatement<Query> statement,
final RoutingFilterFactory routingFilterFactory,
final RoutingOptions routingOptions,
final ExecutorService executorService,
final Optional<PullQueryExecutorMetrics> pullQueryMetrics
) {
return EngineExecutor
Expand All @@ -280,6 +282,7 @@ public PullQueryResult executePullQuery(
statement,
routingFilterFactory,
routingOptions,
executorService,
pullQueryMetrics
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;

/**
* An execution context that can execute statements without changing the core engine's state
Expand Down Expand Up @@ -164,6 +165,7 @@ public PullQueryResult executePullQuery(
final ConfiguredStatement<Query> statement,
final RoutingFilterFactory routingFilterFactory,
final RoutingOptions routingOptions,
final ExecutorService executorService,
final Optional<PullQueryExecutorMetrics> pullQueryMetrics
) {
return EngineExecutor.create(
Expand All @@ -174,6 +176,7 @@ public PullQueryResult executePullQuery(
statement,
routingFilterFactory,
routingOptions,
executorService,
pullQueryMetrics
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.KsqlRequestConfig;
import io.confluent.ksql.util.KsqlServerException;
Expand All @@ -47,7 +46,6 @@
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import org.slf4j.Logger;
Expand All @@ -68,30 +66,30 @@ public final class HARouting implements AutoCloseable {
private final RouteQuery routeQuery;

public HARouting(
final KsqlConfig ksqlConfig,
final PullPhysicalPlan pullPhysicalPlan,
final RoutingFilterFactory routingFilterFactory,
final RoutingOptions routingOptions,
final ConfiguredStatement<Query> statement,
final ServiceContext serviceContext,
final LogicalSchema outputSchema,
final QueryId queryId,
final ExecutorService executorService,
final Optional<PullQueryExecutorMetrics> pullQueryMetrics
) {
this(ksqlConfig, pullPhysicalPlan, routingFilterFactory, routingOptions, statement,
serviceContext, outputSchema, queryId, pullQueryMetrics, HARouting::executeOrRouteQuery);
this(pullPhysicalPlan, routingFilterFactory, routingOptions, statement, serviceContext,
outputSchema, queryId, executorService, pullQueryMetrics, HARouting::executeOrRouteQuery);
}

@VisibleForTesting
HARouting(
final KsqlConfig ksqlConfig,
final PullPhysicalPlan pullPhysicalPlan,
final RoutingFilterFactory routingFilterFactory,
final RoutingOptions routingOptions,
final ConfiguredStatement<Query> statement,
final ServiceContext serviceContext,
final LogicalSchema outputSchema,
final QueryId queryId,
final ExecutorService executorService,
final Optional<PullQueryExecutorMetrics> pullQueryMetrics,
final RouteQuery routeQuery
) {
Expand All @@ -103,11 +101,9 @@ public HARouting(
this.serviceContext = Objects.requireNonNull(serviceContext, "serviceContext");
this.outputSchema = Objects.requireNonNull(outputSchema, "outputSchema");
this.queryId = Objects.requireNonNull(queryId, "queryId");
this.executorService = Objects.requireNonNull(executorService, "executorService");
this.pullQueryMetrics = Objects.requireNonNull(pullQueryMetrics, "pullQueryMetrics");
this.executorService = Executors.newFixedThreadPool(
ksqlConfig.getInt(KsqlConfig.KSQL_QUERY_PULL_THREAD_POOL_SIZE_CONFIG)
);
this.routeQuery = Objects.requireNonNull(routeQuery, "routeQuery");
this.routeQuery = routeQuery;
}

public PullQueryResult handlePullQuery() throws InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.KsqlConfig;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executors;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -82,8 +82,6 @@ public class HARoutingTest {
@Mock
private RoutingFilterFactory routingFilterFactory;
@Mock
private KsqlConfig ksqlConfig;
@Mock
private PullPhysicalPlan pullPhysicalPlan;
@Mock
private Materialization materialization;
Expand All @@ -101,12 +99,10 @@ public void setUp() {
when(location2.getNodes()).thenReturn(ImmutableList.of(node2, node1));
when(location3.getNodes()).thenReturn(ImmutableList.of(node1, node2));
when(location4.getNodes()).thenReturn(ImmutableList.of(node2, node1));
when(ksqlConfig.getInt(KsqlConfig.KSQL_QUERY_PULL_THREAD_POOL_SIZE_CONFIG))
.thenReturn(1);

haRouting = new HARouting(
ksqlConfig, pullPhysicalPlan, routingFilterFactory, routingOptions, statement,
serviceContext, logicalSchema, queryId, Optional.empty(), routeQuery);
pullPhysicalPlan, routingFilterFactory, routingOptions, statement, serviceContext,
logicalSchema, queryId, Executors.newFixedThreadPool(1), Optional.empty(), routeQuery);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import org.apache.kafka.common.utils.Time;

Expand All @@ -59,19 +60,22 @@ public class QueryEndpoint {
private final RoutingFilterFactory routingFilterFactory;
private final Optional<PullQueryExecutorMetrics> pullQueryMetrics;
private final RateLimiter rateLimiter;
private final ExecutorService pullExecutorService;

public QueryEndpoint(
final KsqlEngine ksqlEngine,
final KsqlConfig ksqlConfig,
final RoutingFilterFactory routingFilterFactory,
final Optional<PullQueryExecutorMetrics> pullQueryMetrics,
final RateLimiter rateLimiter
final RateLimiter rateLimiter,
final ExecutorService pullExecutorService
) {
this.ksqlEngine = ksqlEngine;
this.ksqlConfig = ksqlConfig;
this.routingFilterFactory = routingFilterFactory;
this.pullQueryMetrics = pullQueryMetrics;
this.rateLimiter = rateLimiter;
this.pullExecutorService = pullExecutorService;
}

public QueryPublisher createQueryPublisher(
Expand Down Expand Up @@ -133,6 +137,7 @@ private QueryPublisher createPullQueryPublisher(
statement,
routingFilterFactory,
routingOptions,
pullExecutorService,
pullQueryMetrics
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -188,6 +189,7 @@ public final class KsqlRestApplication implements Executable {
private final RoutingFilterFactory routingFilterFactory;
private final Optional<PullQueryExecutorMetrics> pullQueryMetrics;
private final RateLimiter pullQueryRateLimiter;
private final ExecutorService pullExecutorService;

// The startup thread that can be interrupted if necessary during shutdown. This should only
// happen if startup hangs.
Expand Down Expand Up @@ -226,7 +228,8 @@ public static SourceName getCommandsStreamName() {
final DenyListPropertyValidator denyListPropertyValidator,
final Optional<PullQueryExecutorMetrics> pullQueryMetrics,
final RoutingFilterFactory routingFilterFactory,
final RateLimiter pullQueryRateLimiter
final RateLimiter pullQueryRateLimiter,
final ExecutorService pullExecutorService

) {
log.debug("Creating instance of ksqlDB API server");
Expand Down Expand Up @@ -283,6 +286,7 @@ public static SourceName getCommandsStreamName() {
log.debug("ksqlDB API server instance created");
this.routingFilterFactory = requireNonNull(routingFilterFactory, "routingFilterFactory");
this.pullQueryRateLimiter = requireNonNull(pullQueryRateLimiter, "pullQueryRateLimiter");
this.pullExecutorService = requireNonNull(pullExecutorService, "pullExecutorService");
}

@Override
Expand Down Expand Up @@ -323,7 +327,8 @@ public void startAsync() {
denyListPropertyValidator,
pullQueryMetrics,
routingFilterFactory,
pullQueryRateLimiter
pullQueryRateLimiter,
pullExecutorService
);

startAsyncThreadRef.set(Thread.currentThread());
Expand All @@ -344,7 +349,8 @@ public void startAsync() {
serverMetadataResource,
wsQueryEndpoint,
pullQueryMetrics,
pullQueryRateLimiter
pullQueryRateLimiter,
pullExecutorService
);
apiServer = new Server(vertx, ksqlRestConfig, endpoints, securityExtension,
authenticationPlugin, serverState, pullQueryMetrics);
Expand Down Expand Up @@ -486,6 +492,14 @@ public void shutdown() {
log.error("Exception while waiting for pull query metrics to close", e);
}

try {
pullExecutorService.shutdown();
pullExecutorService.awaitTermination(
Duration.ofSeconds(10).toMillis(), TimeUnit.MILLISECONDS);
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
}

try {
ksqlEngine.close();
} catch (final Exception e) {
Expand Down Expand Up @@ -715,6 +729,9 @@ static KsqlRestApplication buildApplication(
heartbeatAgent, lagReportingAgent);
final RateLimiter pullQueryRateLimiter = RateLimiter.create(
ksqlConfig.getInt(KsqlConfig.KSQL_QUERY_PULL_MAX_QPS_CONFIG));
final ExecutorService pullExecutorService = Executors.newFixedThreadPool(
ksqlConfig.getInt(KsqlConfig.KSQL_QUERY_PULL_THREAD_POOL_SIZE_CONFIG)
);

final DenyListPropertyValidator denyListPropertyValidator = new DenyListPropertyValidator(
ksqlConfig.getList(KsqlConfig.KSQL_PROPERTIES_OVERRIDES_DENYLIST));
Expand All @@ -739,7 +756,8 @@ static KsqlRestApplication buildApplication(
denyListPropertyValidator,
pullQueryMetrics,
routingFilterFactory,
pullQueryRateLimiter
pullQueryRateLimiter,
pullExecutorService
);

final List<String> managedTopics = new LinkedList<>();
Expand Down Expand Up @@ -816,7 +834,8 @@ static KsqlRestApplication buildApplication(
denyListPropertyValidator,
pullQueryMetrics,
routingFilterFactory,
pullQueryRateLimiter
pullQueryRateLimiter,
pullExecutorService
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import java.util.function.Supplier;
import org.reactivestreams.Subscriber;
Expand All @@ -82,6 +83,7 @@ public class KsqlServerEndpoints implements Endpoints {
private final WSQueryEndpoint wsQueryEndpoint;
private final Optional<PullQueryExecutorMetrics> pullQueryMetrics;
private final RateLimiter rateLimiter;
private final ExecutorService pullExecutorService;

// CHECKSTYLE_RULES.OFF: ParameterNumber
public KsqlServerEndpoints(
Expand All @@ -100,7 +102,8 @@ public KsqlServerEndpoints(
final ServerMetadataResource serverMetadataResource,
final WSQueryEndpoint wsQueryEndpoint,
final Optional<PullQueryExecutorMetrics> pullQueryMetrics,
final RateLimiter rateLimiter
final RateLimiter rateLimiter,
final ExecutorService pullExecutorService
) {

// CHECKSTYLE_RULES.ON: ParameterNumber
Expand All @@ -121,6 +124,7 @@ public KsqlServerEndpoints(
this.wsQueryEndpoint = Objects.requireNonNull(wsQueryEndpoint);
this.pullQueryMetrics = Objects.requireNonNull(pullQueryMetrics);
this.rateLimiter = Objects.requireNonNull(rateLimiter);
this.pullExecutorService = Objects.requireNonNull(pullExecutorService, "pullExecutorService");
}

@Override
Expand All @@ -134,7 +138,8 @@ public CompletableFuture<QueryPublisher> createQueryPublisher(final String sql,
return executeOnWorker(() -> {
try {
return new QueryEndpoint(
ksqlEngine, ksqlConfig, routingFilterFactory, pullQueryMetrics, rateLimiter)
ksqlEngine, ksqlConfig, routingFilterFactory, pullQueryMetrics,
rateLimiter, pullExecutorService)
.createQueryPublisher(
sql,
properties,
Expand Down
Loading

0 comments on commit 8d2ed9c

Please sign in to comment.