Skip to content

Commit

Permalink
Inject a new ServiceContext per REST call (#2705)
Browse files Browse the repository at this point in the history
A new ServiceContext is created per user request so it can be used to initialize Kafka and SR services using user credentials for future impersonation work.
  • Loading branch information
spena authored Apr 25, 2019
1 parent 70d602b commit 6915220
Show file tree
Hide file tree
Showing 12 changed files with 204 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import io.confluent.ksql.rest.server.computation.CommandRunner;
import io.confluent.ksql.rest.server.computation.CommandStore;
import io.confluent.ksql.rest.server.computation.StatementExecutor;
import io.confluent.ksql.rest.server.context.KsqlRestServiceContextBinder;
import io.confluent.ksql.rest.server.resources.KsqlExceptionMapper;
import io.confluent.ksql.rest.server.resources.KsqlResource;
import io.confluent.ksql.rest.server.resources.RootDocument;
Expand Down Expand Up @@ -244,6 +245,7 @@ public void configureBaseApplication(
new JacksonMessageBodyProvider(JsonMapper.INSTANCE.mapper);
config.register(jsonProvider);
config.register(JsonParseExceptionMapper.class);
config.register(new KsqlRestServiceContextBinder(ksqlConfig));

// Don't want to buffer rows when streaming JSON in a request to the query resource
config.property(ServerProperties.OUTBOUND_CONTENT_LENGTH_BUFFER, 0);
Expand Down Expand Up @@ -392,7 +394,6 @@ static KsqlRestApplication buildApplication(
final StreamedQueryResource streamedQueryResource = new StreamedQueryResource(
ksqlConfig,
ksqlEngine,
serviceContext,
statementParser,
commandStore,
Duration.ofMillis(
Expand All @@ -404,7 +405,6 @@ static KsqlRestApplication buildApplication(
final KsqlResource ksqlResource = new KsqlResource(
ksqlConfig,
ksqlEngine,
serviceContext,
commandStore,
Duration.ofMillis(restConfig.getLong(DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT_MS_CONFIG)),
versionChecker::updateLastRequestTime,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2018 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.rest.server.context;

import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.KsqlConfig;
import org.glassfish.hk2.utilities.binding.AbstractBinder;
import org.glassfish.jersey.process.internal.RequestScoped;

/**
* Configures the {@link ServiceContext} class for dependency injection using the
* {@link javax.ws.rs.core.Context} annotation.
* </p>
* Inject {@code ServiceContext} on each REST method as follows:
* i.e. myMethod(@Context ServiceContext serviceContext)
*/
public class KsqlRestServiceContextBinder extends AbstractBinder {
private final KsqlConfig ksqlConfig;

public KsqlRestServiceContextBinder(final KsqlConfig ksqlConfig) {
this.ksqlConfig = ksqlConfig;
}

@Override
protected void configure() {
bindFactory(new KsqlRestServiceContextFactory(ksqlConfig))
.to(ServiceContext.class)
.in(RequestScoped.class);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2018 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.rest.server.context;

import io.confluent.ksql.services.DefaultServiceContext;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.KsqlConfig;
import org.glassfish.hk2.api.Factory;

/**
* This class implements {@link Factory}, which allows a REST application to create
* a new {@link ServiceContext} during REST requests.
*/
public class KsqlRestServiceContextFactory implements Factory<ServiceContext> {
private final KsqlConfig ksqlConfig;

public KsqlRestServiceContextFactory(final KsqlConfig ksqlConfig) {
this.ksqlConfig = ksqlConfig;
}

@Override
public ServiceContext provide() {
return DefaultServiceContext.create(ksqlConfig);
}

@Override
public void dispose(final ServiceContext serviceContext) {
serviceContext.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ public class RequestHandler {
private final Map<Class<? extends Statement>, StatementExecutor<?>> customExecutors;
private final KsqlEngine ksqlEngine;
private final KsqlConfig ksqlConfig;
private final ServiceContext serviceContext;
private final DistributingExecutor distributor;
private final CommandQueueSync commandQueueSync;

Expand All @@ -56,25 +55,23 @@ public class RequestHandler {
* @param ksqlEngine the primary KSQL engine - the state of this engine <b>will</b>
* be directly modified by this class
* @param ksqlConfig a configuration
* @param serviceContext a service context
*/
public RequestHandler(
final Map<Class<? extends Statement>, StatementExecutor<?>> customExecutors,
final DistributingExecutor distributor,
final KsqlEngine ksqlEngine,
final KsqlConfig ksqlConfig,
final ServiceContext serviceContext,
final CommandQueueSync commandQueueSync
) {
this.customExecutors = Objects.requireNonNull(customExecutors, "customExecutors");
this.ksqlEngine = Objects.requireNonNull(ksqlEngine, "ksqlEngine");
this.ksqlConfig = Objects.requireNonNull(ksqlConfig, "ksqlConfig");
this.serviceContext = Objects.requireNonNull(serviceContext, "serviceContext");
this.distributor = Objects.requireNonNull(distributor, "distributor");
this.commandQueueSync = Objects.requireNonNull(commandQueueSync, "commandQueueSync");
}

public KsqlEntityList execute(
final ServiceContext serviceContext,
final List<ParsedStatement> statements,
final Map<String, Object> propertyOverrides
) {
Expand All @@ -83,7 +80,7 @@ public KsqlEntityList execute(
for (ParsedStatement parsed : statements) {
final PreparedStatement<?> prepared = ksqlEngine.prepare(parsed);
if (prepared.getStatement() instanceof RunScript) {
final KsqlEntityList result = executeRunScript(prepared, propertyOverrides);
final KsqlEntityList result = executeRunScript(serviceContext, prepared, propertyOverrides);
if (!result.isEmpty()) {
// This is to maintain backwards compatibility until we deprecate
// RunScript in the next major release - the expected behavior was
Expand All @@ -93,14 +90,15 @@ public KsqlEntityList execute(
} else {
final ConfiguredStatement<?> configured = ConfiguredStatement.of(
prepared, scopedPropertyOverrides, ksqlConfig);
executeStatement(configured, entities).ifPresent(entities::add);
executeStatement(serviceContext, configured, entities).ifPresent(entities::add);
}
}
return entities;
}

@SuppressWarnings("unchecked")
private <T extends Statement> Optional<KsqlEntity> executeStatement(
final ServiceContext serviceContext,
final ConfiguredStatement<T> configured,
final KsqlEntityList entities
) {
Expand All @@ -117,6 +115,7 @@ private <T extends Statement> Optional<KsqlEntity> executeStatement(
}

private KsqlEntityList executeRunScript(
final ServiceContext serviceContext,
final PreparedStatement<?> statement,
final Map<String, Object> propertyOverrides) {
final String sql = (String) propertyOverrides
Expand All @@ -127,7 +126,7 @@ private KsqlEntityList executeRunScript(
"Request is missing script content", statement.getStatementText());
}

return execute(ksqlEngine.parse(sql), propertyOverrides);
return execute(serviceContext, ksqlEngine.parse(sql), propertyOverrides);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;

Expand Down Expand Up @@ -93,7 +94,6 @@ public class KsqlResource {
public KsqlResource(
final KsqlConfig ksqlConfig,
final KsqlEngine ksqlEngine,
final ServiceContext serviceContext,
final CommandQueue commandQueue,
final Duration distributedCmdResponseTimeout,
final ActivenessRegistrar activenessRegistrar,
Expand All @@ -110,7 +110,6 @@ public KsqlResource(
CustomValidators.VALIDATOR_MAP,
injectorFactory,
ksqlEngine::createSandbox,
SandboxedServiceContext.create(serviceContext),
ksqlConfig);
this.handler = new RequestHandler(
CustomExecutors.EXECUTOR_MAP,
Expand All @@ -120,7 +119,6 @@ public KsqlResource(
injectorFactory),
ksqlEngine,
ksqlConfig,
serviceContext,
new DefaultCommandQueueSync(
commandQueue,
KsqlResource::shouldSynchronize,
Expand All @@ -130,11 +128,14 @@ public KsqlResource(

@POST
@Path("/terminate")
public Response terminateCluster(final ClusterTerminateRequest request) {
public Response terminateCluster(
@Context final ServiceContext serviceContext,
final ClusterTerminateRequest request
) {
ensureValidPatterns(request.getDeleteTopicList());
try {
return Response.ok(
handler.execute(TERMINATE_CLUSTER, request.getStreamsProperties())
handler.execute(serviceContext, TERMINATE_CLUSTER, request.getStreamsProperties())
).build();
} catch (final Exception e) {
return Errors.serverErrorForStatement(
Expand All @@ -143,7 +144,10 @@ public Response terminateCluster(final ClusterTerminateRequest request) {
}

@POST
public Response handleKsqlStatements(final KsqlRequest request) {
public Response handleKsqlStatements(
@Context final ServiceContext serviceContext,
final KsqlRequest request
) {
if (!ksqlEngine.isAcceptingStatements()) {
return Errors.serverErrorForStatement(
new KsqlException("The cluster has been terminated. No new request will be accepted."),
Expand All @@ -152,16 +156,26 @@ public Response handleKsqlStatements(final KsqlRequest request) {
);
}
activenessRegistrar.updateLastRequestTime();

try {
CommandStoreUtil.httpWaitForCommandSequenceNumber(
commandQueue,
request,
distributedCmdResponseTimeout);

final List<ParsedStatement> statements = ksqlEngine.parse(request.getKsql());
validator.validate(statements, request.getStreamsProperties(), request.getKsql());
validator.validate(
SandboxedServiceContext.create(serviceContext),
statements,
request.getStreamsProperties(),
request.getKsql()
);

final KsqlEntityList entities = handler.execute(statements, request.getStreamsProperties());
final KsqlEntityList entities = handler.execute(
serviceContext,
statements,
request.getStreamsProperties()
);
return Response.ok(entities).build();
} catch (final KsqlRestException e) {
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.slf4j.Logger;
Expand All @@ -58,7 +59,6 @@ public class StreamedQueryResource {

private final KsqlConfig ksqlConfig;
private final KsqlEngine ksqlEngine;
private final ServiceContext serviceContext;
private final StatementParser statementParser;
private final CommandQueue commandQueue;
private final Duration disconnectCheckInterval;
Expand All @@ -69,7 +69,6 @@ public class StreamedQueryResource {
public StreamedQueryResource(
final KsqlConfig ksqlConfig,
final KsqlEngine ksqlEngine,
final ServiceContext serviceContext,
final StatementParser statementParser,
final CommandQueue commandQueue,
final Duration disconnectCheckInterval,
Expand All @@ -78,7 +77,6 @@ public StreamedQueryResource(
) {
this.ksqlConfig = Objects.requireNonNull(ksqlConfig, "ksqlConfig");
this.ksqlEngine = Objects.requireNonNull(ksqlEngine, "ksqlEngine");
this.serviceContext = Objects.requireNonNull(serviceContext, "serviceContext");
this.statementParser = Objects.requireNonNull(statementParser, "statementParser");
this.commandQueue = Objects.requireNonNull(commandQueue, "commandQueue");
this.disconnectCheckInterval =
Expand All @@ -91,7 +89,10 @@ public StreamedQueryResource(
}

@POST
public Response streamQuery(final KsqlRequest request) throws Exception {
public Response streamQuery(
@Context final ServiceContext serviceContext,
final KsqlRequest request
) throws Exception {
if (!ksqlEngine.isAcceptingStatements()) {
return Errors.serverErrorForStatement(
new KsqlException("Cluster has been terminated."),
Expand All @@ -106,7 +107,7 @@ public Response streamQuery(final KsqlRequest request) throws Exception {
CommandStoreUtil.httpWaitForCommandSequenceNumber(
commandQueue, request, commandQueueCatchupTimeout);

return handleStatement(request, statement);
return handleStatement(serviceContext, request, statement);
}

private PreparedStatement<?> parseStatement(final KsqlRequest request) {
Expand All @@ -124,6 +125,7 @@ private PreparedStatement<?> parseStatement(final KsqlRequest request) {

@SuppressWarnings("unchecked")
private Response handleStatement(
final ServiceContext serviceContext,
final KsqlRequest request,
final PreparedStatement<?> statement
) throws Exception {
Expand All @@ -134,6 +136,7 @@ private Response handleStatement(

if (statement.getStatement() instanceof PrintTopic) {
return handlePrintTopic(
serviceContext,
request.getStreamsProperties(),
(PreparedStatement<PrintTopic>) statement);
}
Expand Down Expand Up @@ -173,6 +176,7 @@ private Response handleQuery(
}

private Response handlePrintTopic(
final ServiceContext serviceContext,
final Map<String, Object> streamProperties,
final PreparedStatement<PrintTopic> statement
) {
Expand Down
Loading

0 comments on commit 6915220

Please sign in to comment.