From 69152204e1412d5b8218889f96f9e2e2be9ccef1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20Pe=C3=B1a?= Date: Thu, 25 Apr 2019 13:32:52 -0500 Subject: [PATCH] Inject a new ServiceContext per REST call (#2705) A new ServiceContext is created per user request so it can be used to initialize Kafka and SR services using user credentials for future impersonation work. --- .../ksql/rest/server/KsqlRestApplication.java | 4 +- .../context/KsqlRestServiceContextBinder.java | 43 +++++++++++++++++++ .../KsqlRestServiceContextFactory.java | 43 +++++++++++++++++++ .../rest/server/execution/RequestHandler.java | 13 +++--- .../rest/server/resources/KsqlResource.java | 30 +++++++++---- .../streaming/StreamedQueryResource.java | 14 +++--- .../server/validation/RequestValidator.java | 15 ++++--- .../rest/server/computation/RecoveryTest.java | 3 +- .../server/execution/RequestHandlerTest.java | 17 +++++--- .../server/resources/KsqlResourceTest.java | 35 +++++++-------- .../resources/StreamedQueryResourceTest.java | 41 ++++++++++++++---- .../validation/RequestValidatorTest.java | 24 +++++------ 12 files changed, 204 insertions(+), 78 deletions(-) create mode 100644 ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/context/KsqlRestServiceContextBinder.java create mode 100644 ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/context/KsqlRestServiceContextFactory.java diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java index 46422cf3761d..25a8544ce9d0 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java @@ -45,6 +45,7 @@ import io.confluent.ksql.rest.server.computation.CommandRunner; import io.confluent.ksql.rest.server.computation.CommandStore; import io.confluent.ksql.rest.server.computation.StatementExecutor; +import io.confluent.ksql.rest.server.context.KsqlRestServiceContextBinder; import io.confluent.ksql.rest.server.resources.KsqlExceptionMapper; import io.confluent.ksql.rest.server.resources.KsqlResource; import io.confluent.ksql.rest.server.resources.RootDocument; @@ -244,6 +245,7 @@ public void configureBaseApplication( new JacksonMessageBodyProvider(JsonMapper.INSTANCE.mapper); config.register(jsonProvider); config.register(JsonParseExceptionMapper.class); + config.register(new KsqlRestServiceContextBinder(ksqlConfig)); // Don't want to buffer rows when streaming JSON in a request to the query resource config.property(ServerProperties.OUTBOUND_CONTENT_LENGTH_BUFFER, 0); @@ -392,7 +394,6 @@ static KsqlRestApplication buildApplication( final StreamedQueryResource streamedQueryResource = new StreamedQueryResource( ksqlConfig, ksqlEngine, - serviceContext, statementParser, commandStore, Duration.ofMillis( @@ -404,7 +405,6 @@ static KsqlRestApplication buildApplication( final KsqlResource ksqlResource = new KsqlResource( ksqlConfig, ksqlEngine, - serviceContext, commandStore, Duration.ofMillis(restConfig.getLong(DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT_MS_CONFIG)), versionChecker::updateLastRequestTime, diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/context/KsqlRestServiceContextBinder.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/context/KsqlRestServiceContextBinder.java new file mode 100644 index 000000000000..b46365d64fc5 --- /dev/null +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/context/KsqlRestServiceContextBinder.java @@ -0,0 +1,43 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.server.context; + +import io.confluent.ksql.services.ServiceContext; +import io.confluent.ksql.util.KsqlConfig; +import org.glassfish.hk2.utilities.binding.AbstractBinder; +import org.glassfish.jersey.process.internal.RequestScoped; + +/** + * Configures the {@link ServiceContext} class for dependency injection using the + * {@link javax.ws.rs.core.Context} annotation. + *

+ * Inject {@code ServiceContext} on each REST method as follows: + * i.e. myMethod(@Context ServiceContext serviceContext) + */ +public class KsqlRestServiceContextBinder extends AbstractBinder { + private final KsqlConfig ksqlConfig; + + public KsqlRestServiceContextBinder(final KsqlConfig ksqlConfig) { + this.ksqlConfig = ksqlConfig; + } + + @Override + protected void configure() { + bindFactory(new KsqlRestServiceContextFactory(ksqlConfig)) + .to(ServiceContext.class) + .in(RequestScoped.class); + } +} diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/context/KsqlRestServiceContextFactory.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/context/KsqlRestServiceContextFactory.java new file mode 100644 index 000000000000..9f0ea1d16573 --- /dev/null +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/context/KsqlRestServiceContextFactory.java @@ -0,0 +1,43 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.server.context; + +import io.confluent.ksql.services.DefaultServiceContext; +import io.confluent.ksql.services.ServiceContext; +import io.confluent.ksql.util.KsqlConfig; +import org.glassfish.hk2.api.Factory; + +/** + * This class implements {@link Factory}, which allows a REST application to create + * a new {@link ServiceContext} during REST requests. + */ +public class KsqlRestServiceContextFactory implements Factory { + private final KsqlConfig ksqlConfig; + + public KsqlRestServiceContextFactory(final KsqlConfig ksqlConfig) { + this.ksqlConfig = ksqlConfig; + } + + @Override + public ServiceContext provide() { + return DefaultServiceContext.create(ksqlConfig); + } + + @Override + public void dispose(final ServiceContext serviceContext) { + serviceContext.close(); + } +} diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/RequestHandler.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/RequestHandler.java index dd4140e90729..0c9bb07f5374 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/RequestHandler.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/RequestHandler.java @@ -44,7 +44,6 @@ public class RequestHandler { private final Map, StatementExecutor> customExecutors; private final KsqlEngine ksqlEngine; private final KsqlConfig ksqlConfig; - private final ServiceContext serviceContext; private final DistributingExecutor distributor; private final CommandQueueSync commandQueueSync; @@ -56,25 +55,23 @@ public class RequestHandler { * @param ksqlEngine the primary KSQL engine - the state of this engine will * be directly modified by this class * @param ksqlConfig a configuration - * @param serviceContext a service context */ public RequestHandler( final Map, StatementExecutor> customExecutors, final DistributingExecutor distributor, final KsqlEngine ksqlEngine, final KsqlConfig ksqlConfig, - final ServiceContext serviceContext, final CommandQueueSync commandQueueSync ) { this.customExecutors = Objects.requireNonNull(customExecutors, "customExecutors"); this.ksqlEngine = Objects.requireNonNull(ksqlEngine, "ksqlEngine"); this.ksqlConfig = Objects.requireNonNull(ksqlConfig, "ksqlConfig"); - this.serviceContext = Objects.requireNonNull(serviceContext, "serviceContext"); this.distributor = Objects.requireNonNull(distributor, "distributor"); this.commandQueueSync = Objects.requireNonNull(commandQueueSync, "commandQueueSync"); } public KsqlEntityList execute( + final ServiceContext serviceContext, final List statements, final Map propertyOverrides ) { @@ -83,7 +80,7 @@ public KsqlEntityList execute( for (ParsedStatement parsed : statements) { final PreparedStatement prepared = ksqlEngine.prepare(parsed); if (prepared.getStatement() instanceof RunScript) { - final KsqlEntityList result = executeRunScript(prepared, propertyOverrides); + final KsqlEntityList result = executeRunScript(serviceContext, prepared, propertyOverrides); if (!result.isEmpty()) { // This is to maintain backwards compatibility until we deprecate // RunScript in the next major release - the expected behavior was @@ -93,7 +90,7 @@ public KsqlEntityList execute( } else { final ConfiguredStatement configured = ConfiguredStatement.of( prepared, scopedPropertyOverrides, ksqlConfig); - executeStatement(configured, entities).ifPresent(entities::add); + executeStatement(serviceContext, configured, entities).ifPresent(entities::add); } } return entities; @@ -101,6 +98,7 @@ public KsqlEntityList execute( @SuppressWarnings("unchecked") private Optional executeStatement( + final ServiceContext serviceContext, final ConfiguredStatement configured, final KsqlEntityList entities ) { @@ -117,6 +115,7 @@ private Optional executeStatement( } private KsqlEntityList executeRunScript( + final ServiceContext serviceContext, final PreparedStatement statement, final Map propertyOverrides) { final String sql = (String) propertyOverrides @@ -127,7 +126,7 @@ private KsqlEntityList executeRunScript( "Request is missing script content", statement.getStatementText()); } - return execute(ksqlEngine.parse(sql), propertyOverrides); + return execute(serviceContext, ksqlEngine.parse(sql), propertyOverrides); } } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java index 30de8ab8ff11..7c2b4eb6374c 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java @@ -59,6 +59,7 @@ import javax.ws.rs.POST; import javax.ws.rs.Path; import javax.ws.rs.Produces; +import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; @@ -93,7 +94,6 @@ public class KsqlResource { public KsqlResource( final KsqlConfig ksqlConfig, final KsqlEngine ksqlEngine, - final ServiceContext serviceContext, final CommandQueue commandQueue, final Duration distributedCmdResponseTimeout, final ActivenessRegistrar activenessRegistrar, @@ -110,7 +110,6 @@ public KsqlResource( CustomValidators.VALIDATOR_MAP, injectorFactory, ksqlEngine::createSandbox, - SandboxedServiceContext.create(serviceContext), ksqlConfig); this.handler = new RequestHandler( CustomExecutors.EXECUTOR_MAP, @@ -120,7 +119,6 @@ public KsqlResource( injectorFactory), ksqlEngine, ksqlConfig, - serviceContext, new DefaultCommandQueueSync( commandQueue, KsqlResource::shouldSynchronize, @@ -130,11 +128,14 @@ public KsqlResource( @POST @Path("/terminate") - public Response terminateCluster(final ClusterTerminateRequest request) { + public Response terminateCluster( + @Context final ServiceContext serviceContext, + final ClusterTerminateRequest request + ) { ensureValidPatterns(request.getDeleteTopicList()); try { return Response.ok( - handler.execute(TERMINATE_CLUSTER, request.getStreamsProperties()) + handler.execute(serviceContext, TERMINATE_CLUSTER, request.getStreamsProperties()) ).build(); } catch (final Exception e) { return Errors.serverErrorForStatement( @@ -143,7 +144,10 @@ public Response terminateCluster(final ClusterTerminateRequest request) { } @POST - public Response handleKsqlStatements(final KsqlRequest request) { + public Response handleKsqlStatements( + @Context final ServiceContext serviceContext, + final KsqlRequest request + ) { if (!ksqlEngine.isAcceptingStatements()) { return Errors.serverErrorForStatement( new KsqlException("The cluster has been terminated. No new request will be accepted."), @@ -152,6 +156,7 @@ public Response handleKsqlStatements(final KsqlRequest request) { ); } activenessRegistrar.updateLastRequestTime(); + try { CommandStoreUtil.httpWaitForCommandSequenceNumber( commandQueue, @@ -159,9 +164,18 @@ public Response handleKsqlStatements(final KsqlRequest request) { distributedCmdResponseTimeout); final List statements = ksqlEngine.parse(request.getKsql()); - validator.validate(statements, request.getStreamsProperties(), request.getKsql()); + validator.validate( + SandboxedServiceContext.create(serviceContext), + statements, + request.getStreamsProperties(), + request.getKsql() + ); - final KsqlEntityList entities = handler.execute(statements, request.getStreamsProperties()); + final KsqlEntityList entities = handler.execute( + serviceContext, + statements, + request.getStreamsProperties() + ); return Response.ok(entities).build(); } catch (final KsqlRestException e) { throw e; diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java index d61bc664b3cb..1bd0774a27c0 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java @@ -44,6 +44,7 @@ import javax.ws.rs.POST; import javax.ws.rs.Path; import javax.ws.rs.Produces; +import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import org.slf4j.Logger; @@ -58,7 +59,6 @@ public class StreamedQueryResource { private final KsqlConfig ksqlConfig; private final KsqlEngine ksqlEngine; - private final ServiceContext serviceContext; private final StatementParser statementParser; private final CommandQueue commandQueue; private final Duration disconnectCheckInterval; @@ -69,7 +69,6 @@ public class StreamedQueryResource { public StreamedQueryResource( final KsqlConfig ksqlConfig, final KsqlEngine ksqlEngine, - final ServiceContext serviceContext, final StatementParser statementParser, final CommandQueue commandQueue, final Duration disconnectCheckInterval, @@ -78,7 +77,6 @@ public StreamedQueryResource( ) { this.ksqlConfig = Objects.requireNonNull(ksqlConfig, "ksqlConfig"); this.ksqlEngine = Objects.requireNonNull(ksqlEngine, "ksqlEngine"); - this.serviceContext = Objects.requireNonNull(serviceContext, "serviceContext"); this.statementParser = Objects.requireNonNull(statementParser, "statementParser"); this.commandQueue = Objects.requireNonNull(commandQueue, "commandQueue"); this.disconnectCheckInterval = @@ -91,7 +89,10 @@ public StreamedQueryResource( } @POST - public Response streamQuery(final KsqlRequest request) throws Exception { + public Response streamQuery( + @Context final ServiceContext serviceContext, + final KsqlRequest request + ) throws Exception { if (!ksqlEngine.isAcceptingStatements()) { return Errors.serverErrorForStatement( new KsqlException("Cluster has been terminated."), @@ -106,7 +107,7 @@ public Response streamQuery(final KsqlRequest request) throws Exception { CommandStoreUtil.httpWaitForCommandSequenceNumber( commandQueue, request, commandQueueCatchupTimeout); - return handleStatement(request, statement); + return handleStatement(serviceContext, request, statement); } private PreparedStatement parseStatement(final KsqlRequest request) { @@ -124,6 +125,7 @@ private PreparedStatement parseStatement(final KsqlRequest request) { @SuppressWarnings("unchecked") private Response handleStatement( + final ServiceContext serviceContext, final KsqlRequest request, final PreparedStatement statement ) throws Exception { @@ -134,6 +136,7 @@ private Response handleStatement( if (statement.getStatement() instanceof PrintTopic) { return handlePrintTopic( + serviceContext, request.getStreamsProperties(), (PreparedStatement) statement); } @@ -173,6 +176,7 @@ private Response handleQuery( } private Response handlePrintTopic( + final ServiceContext serviceContext, final Map streamProperties, final PreparedStatement statement ) { diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/RequestValidator.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/RequestValidator.java index 10357ed631a9..3ee258964032 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/RequestValidator.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/RequestValidator.java @@ -54,7 +54,6 @@ public class RequestValidator { private final Map, StatementValidator> customValidators; private final BiFunction injectorFactory; private final Supplier snapshotSupplier; - private final ServiceContext serviceContext; private final KsqlConfig ksqlConfig; /** @@ -63,20 +62,17 @@ public class RequestValidator { * @param snapshotSupplier supplies a snapshot of the current execution state, the * snapshot returned will be owned by this class and changes * to the snapshot should not affect the source and vice versa - * @param serviceContext the {@link ServiceContext} to use * @param ksqlConfig the {@link KsqlConfig} to validate against */ public RequestValidator( final Map, StatementValidator> customValidators, final BiFunction injectorFactory, final Supplier snapshotSupplier, - final ServiceContext serviceContext, final KsqlConfig ksqlConfig ) { this.customValidators = requireNonNull(customValidators, "customValidators"); this.injectorFactory = requireNonNull(injectorFactory, "injectorFactory"); this.snapshotSupplier = requireNonNull(snapshotSupplier, "snapshotSupplier"); - this.serviceContext = requireSandbox(requireNonNull(serviceContext, "serviceContext")); this.ksqlConfig = requireNonNull(ksqlConfig, "ksqlConfig"); } @@ -95,10 +91,13 @@ public RequestValidator( * to support */ public int validate( + final ServiceContext serviceContext, final List statements, final Map propertyOverrides, final String sql ) { + requireSandbox(serviceContext); + validateOverriddenConfigProperties(propertyOverrides); final KsqlExecutionContext ctx = requireSandbox(snapshotSupplier.get()); final Injector injector = injectorFactory.apply(ctx, serviceContext); @@ -110,8 +109,8 @@ public int validate( prepared, propertyOverrides, ksqlConfig); numPersistentQueries += (prepared.getStatement() instanceof RunScript) - ? validateRunScript(configured, ctx) - : validate(configured, ctx, injector); + ? validateRunScript(serviceContext, configured, ctx) + : validate(serviceContext, configured, ctx, injector); } if (QueryCapacityUtil.exceedsPersistentQueryCapacity(ctx, ksqlConfig, numPersistentQueries)) { @@ -128,6 +127,7 @@ public int validate( */ @SuppressWarnings("unchecked") private int validate( + final ServiceContext serviceContext, final ConfiguredStatement configured, final KsqlExecutionContext executionContext, final Injector injector @@ -152,6 +152,7 @@ private int validate( } private int validateRunScript( + final ServiceContext serviceContext, final ConfiguredStatement statement, final KsqlExecutionContext executionContext) { final String sql = (String) statement.getOverrides() @@ -166,7 +167,7 @@ private int validateRunScript( + "Note: RUN SCRIPT is deprecated and will be removed in the next major version. " + "statement: " + statement.getStatementText()); - return validate(executionContext.parse(sql), statement.getOverrides(), sql); + return validate(serviceContext, executionContext.parse(sql), statement.getOverrides(), sql); } private static void validateOverriddenConfigProperties( diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java index 4476989e810b..aea51072a903 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java @@ -169,7 +169,6 @@ private class KsqlServer { this.ksqlResource = new KsqlResource( ksqlConfig, ksqlEngine, - serviceContext, fakeCommandQueue, Duration.ofMillis(0), ()->{}, @@ -201,7 +200,7 @@ void executeCommands() { void submitCommands(final String ...statements) { for (final String statement : statements) { - final Response response = ksqlResource.handleKsqlStatements( + final Response response = ksqlResource.handleKsqlStatements(serviceContext, new KsqlRequest(statement, Collections.emptyMap(), null)); assertThat(response.getStatus(), equalTo(200)); executeCommands(); diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/RequestHandlerTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/RequestHandlerTest.java index df564036637a..842b8d105e87 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/RequestHandlerTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/RequestHandlerTest.java @@ -96,7 +96,7 @@ public void shouldUseCustomExecutor() { // When final List statements = new DefaultKsqlParser().parse("CREATE STREAM x WITH (kafka_topic='x');"); - final KsqlEntityList entities = handler.execute(statements, ImmutableMap.of()); + final KsqlEntityList entities = handler.execute(serviceContext, statements, ImmutableMap.of()); // Then assertThat(entities, contains(entity)); @@ -118,7 +118,7 @@ public void shouldDefaultToDistributor() { // When final List statements = new DefaultKsqlParser().parse("CREATE STREAM x WITH (kafka_topic='x');"); - final KsqlEntityList entities = handler.execute(statements, ImmutableMap.of()); + final KsqlEntityList entities = handler.execute(serviceContext, statements, ImmutableMap.of()); // Then assertThat(entities, contains(entity)); @@ -140,7 +140,11 @@ public void shouldDistributeProperties() { // When final List statements = new DefaultKsqlParser().parse("CREATE STREAM x WITH (kafka_topic='x');"); - final KsqlEntityList entities = handler.execute(statements, ImmutableMap.of("x", "y")); + final KsqlEntityList entities = handler.execute( + serviceContext, + statements, + ImmutableMap.of("x", "y") + ); // Then assertThat(entities, contains(entity)); @@ -175,7 +179,7 @@ public void shouldWaitForDistributedStatements() { ); // When - handler.execute(statements, ImmutableMap.of()); + handler.execute(serviceContext, statements, ImmutableMap.of()); // Then verify(sync).waitFor(argThat(hasItems(entity1, entity2)), any()); @@ -199,7 +203,7 @@ public void shouldInlineRunScriptStatements() { // When: final List statements = new DefaultKsqlParser() .parse("RUN SCRIPT '/some/script.sql';" ); - handler.execute(statements, props); + handler.execute(serviceContext, statements, props); // Then: verify(customExecutor, times(1)) @@ -229,7 +233,7 @@ public void shouldOnlyReturnLastInRunScript() { // When: final List statements = new DefaultKsqlParser() .parse("RUN SCRIPT '/some/script.sql';" ); - final KsqlEntityList result = handler.execute(statements, props); + final KsqlEntityList result = handler.execute(serviceContext, statements, props); // Then: assertThat(result, contains(entity2)); @@ -242,7 +246,6 @@ private void givenRequestHandler( distributor, ksqlEngine, ksqlConfig, - serviceContext, sync ); } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java index 1d3fa0e24162..d67b157c6f1d 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java @@ -43,18 +43,14 @@ import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.isIn; import static org.hamcrest.Matchers.not; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; import static org.mockito.hamcrest.MockitoHamcrest.argThat; import com.google.common.collect.ImmutableList; @@ -135,13 +131,7 @@ import io.confluent.rest.RestConfig; import java.io.IOException; import java.time.Duration; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Properties; +import java.util.*; import java.util.concurrent.TimeoutException; import java.util.function.Function; import java.util.stream.Collectors; @@ -1499,7 +1489,7 @@ public void shouldReturnServiceUnavailableIfTimeoutWaitingForCommandSequenceNumb @Test public void shouldUpdateTheLastRequestTime() { // When: - ksqlResource.handleKsqlStatements(VALID_EXECUTABLE_REQUEST); + ksqlResource.handleKsqlStatements(serviceContext, VALID_EXECUTABLE_REQUEST); // Then: verify(activenessRegistrar).updateLastRequestTime(); @@ -1508,7 +1498,10 @@ public void shouldUpdateTheLastRequestTime() { @Test public void shouldHandleTerminateRequestCorrectly() { // When: - final Response response = ksqlResource.terminateCluster(VALID_TERMINATE_REQUEST); + final Response response = ksqlResource.terminateCluster( + serviceContext, + VALID_TERMINATE_REQUEST + ); // Then: assertThat(response.getStatus(), equalTo(200)); @@ -1534,7 +1527,10 @@ public void shouldFailIfCannotWriteTerminateCommand() { when(commandStore.enqueueCommand(any())).thenThrow(new KsqlException("")); // When: - final Response response = ksqlResource.terminateCluster(VALID_TERMINATE_REQUEST); + final Response response = ksqlResource.terminateCluster( + serviceContext, + VALID_TERMINATE_REQUEST + ); // Then: assertThat(response.getStatus(), equalTo(500)); @@ -1556,7 +1552,7 @@ public void shouldFailTerminateOnInvalidDeleteTopicPattern() { "Invalid pattern: [Invalid Regex")))); // When: - ksqlResource.terminateCluster(request); + ksqlResource.terminateCluster(serviceContext, request); } @Test @@ -1787,7 +1783,7 @@ private KsqlErrorMessage makeFailingRequestWithSequenceNumber( private KsqlErrorMessage makeFailingRequest(final KsqlRequest ksqlRequest, final Code errorCode) { try { - final Response response = ksqlResource.handleKsqlStatements(ksqlRequest); + final Response response = ksqlResource.handleKsqlStatements(serviceContext, ksqlRequest); assertThat(response.getStatus(), is(errorCode.getCode())); assertThat(response.getEntity(), instanceOf(KsqlErrorMessage.class)); return (KsqlErrorMessage) response.getEntity(); @@ -1847,7 +1843,7 @@ private List makeMultipleRequest( final KsqlRequest ksqlRequest, final Class expectedEntityType) { - final Response response = ksqlResource.handleKsqlStatements(ksqlRequest); + final Response response = ksqlResource.handleKsqlStatements(serviceContext, ksqlRequest); if (response.getStatus() != Response.Status.OK.getStatusCode()) { throw new KsqlRestException(response); } @@ -1888,7 +1884,6 @@ private void setUpKsqlResource() { ksqlResource = new KsqlResource( ksqlConfig, ksqlEngine, - serviceContext, commandStore, DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT, activenessRegistrar, diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/StreamedQueryResourceTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/StreamedQueryResourceTest.java index 6b911bfac560..e3c0b24401bd 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/StreamedQueryResourceTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/StreamedQueryResourceTest.java @@ -65,7 +65,10 @@ import java.io.PipedInputStream; import java.io.PipedOutputStream; import java.time.Duration; +import java.util.Arrays; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.LinkedList; import java.util.Map; import java.util.Scanner; @@ -73,6 +76,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import java.util.function.Supplier; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; import javax.ws.rs.core.StreamingOutput; @@ -86,6 +90,7 @@ import org.easymock.Mock; import org.easymock.MockType; import org.eclipse.jetty.http.HttpStatus.Code; +import org.hamcrest.Matchers; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -138,7 +143,6 @@ public void setup() { testResource = new StreamedQueryResource( ksqlConfig, mockKsqlEngine, - serviceContext, mockStatementParser, commandQueue, DISCONNECT_CHECK_INTERVAL, @@ -156,7 +160,10 @@ public void shouldFailIfIsNotAcceptingStatements() throws Exception { // When: final Response response = - testResource.streamQuery(new KsqlRequest(queryString, Collections.emptyMap(), null)); + testResource.streamQuery( + serviceContext, + new KsqlRequest(queryString, Collections.emptyMap(), null) + ); // Then: assertThat(response.getStatus(), equalTo(Status.INTERNAL_SERVER_ERROR.getStatusCode())); @@ -182,7 +189,10 @@ public void shouldReturn400OnBadStatement() throws Exception { exceptionErrorMessage(errorCode(is(Errors.ERROR_CODE_BAD_STATEMENT)))); // When: - testResource.streamQuery(new KsqlRequest("query", Collections.emptyMap(), null)); + testResource.streamQuery( + serviceContext, + new KsqlRequest("query", Collections.emptyMap(), null) + ); } @Test @@ -191,7 +201,10 @@ public void shouldNotWaitIfCommandSequenceNumberSpecified() throws Exception { replay(commandQueue); // When: - testResource.streamQuery(new KsqlRequest(queryString, Collections.emptyMap(), null)); + testResource.streamQuery( + serviceContext, + new KsqlRequest(queryString, Collections.emptyMap(), null) + ); // Then: verify(commandQueue); @@ -206,7 +219,10 @@ public void shouldWaitIfCommandSequenceNumberSpecified() throws Exception { replay(commandQueue); // When: - testResource.streamQuery(new KsqlRequest(queryString, Collections.emptyMap(), 3L)); + testResource.streamQuery( + serviceContext, + new KsqlRequest(queryString, Collections.emptyMap(), 3L) + ); // Then: verify(commandQueue); @@ -230,7 +246,10 @@ public void shouldReturnServiceUnavailableIfTimeoutWaitingForCommandSequenceNumb exceptionErrorMessage(errorCode(is(Errors.ERROR_CODE_COMMAND_QUEUE_CATCHUP_TIMEOUT)))); // When: - testResource.streamQuery(new KsqlRequest(queryString, Collections.emptyMap(), 3L)); + testResource.streamQuery( + serviceContext, + new KsqlRequest(queryString, Collections.emptyMap(), 3L) + ); } @SuppressWarnings("unchecked") @@ -310,7 +329,10 @@ public void shouldStreamRowsCorrectly() throws Throwable { replay(mockKsqlEngine, mockStatementParser, mockKafkaStreams, mockOutputNode); final Response response = - testResource.streamQuery(new KsqlRequest(queryString, requestStreamsProperties, null)); + testResource.streamQuery( + serviceContext, + new KsqlRequest(queryString, requestStreamsProperties, null) + ); final PipedOutputStream responseOutputStream = new EOFPipedOutputStream(); final PipedInputStream responseInputStream = new PipedInputStream(responseOutputStream, 1); final StreamingOutput responseStream = (StreamingOutput) response.getEntity(); @@ -432,7 +454,10 @@ public void shouldUpdateTheLastRequestTime() throws Exception { EasyMock.replay(activenessRegistrar); // When: - testResource.streamQuery(new KsqlRequest(queryString, Collections.emptyMap(), null)); + testResource.streamQuery( + serviceContext, + new KsqlRequest(queryString, Collections.emptyMap(), null) + ); // Then: EasyMock.verify(activenessRegistrar); diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/validation/RequestValidatorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/validation/RequestValidatorTest.java index 0fc29020cebf..66eaa36c7704 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/validation/RequestValidatorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/validation/RequestValidatorTest.java @@ -129,7 +129,7 @@ public void shouldCallStatementValidator() { givenParsed("CREATE STREAM x WITH (kafka_topic='x');"); // When: - validator.validate(statements, ImmutableMap.of(), "sql"); + validator.validate(serviceContext, statements, ImmutableMap.of(), "sql"); // Then: verify(statementValidator, times(1)).validate( @@ -145,7 +145,7 @@ public void shouldExecuteOnEngineIfNoCustomExecutor() { final List statements = givenParsed("SET 'property'='value';"); // When: - validator.validate(statements, ImmutableMap.of(), "sql"); + validator.validate(serviceContext, statements, ImmutableMap.of(), "sql"); // Then: verify(ksqlEngine, times(1)).execute( @@ -170,7 +170,7 @@ public void shouldThrowExceptionIfValidationFails() { expectedException.expectMessage("Fail"); // When: - validator.validate(statements, ImmutableMap.of(), "sql"); + validator.validate(serviceContext, statements, ImmutableMap.of(), "sql"); } @Test @@ -184,7 +184,7 @@ public void shouldThrowIfNoValidatorAvailable() { expectedException.expectMessage("Do not know how to validate statement"); // When: - validator.validate(statements, ImmutableMap.of(), "sql"); + validator.validate(serviceContext, statements, ImmutableMap.of(), "sql"); } @Test @@ -203,7 +203,7 @@ public void shouldThrowIfTooManyPersistentQueries() { expectedException.expectMessage("persistent queries to exceed the configured limit"); // When: - validator.validate(statements, ImmutableMap.of(), "sql"); + validator.validate(serviceContext, statements, ImmutableMap.of(), "sql"); } @Test @@ -224,7 +224,7 @@ public void shouldNotThrowIfManyNonPersistentQueries() { // Expect Nothing: // When: - validator.validate(statements, ImmutableMap.of(), "sql"); + validator.validate(serviceContext, statements, ImmutableMap.of(), "sql"); } @Test @@ -241,7 +241,7 @@ public void shouldValidateRunScript() { final List statements = givenParsed("RUN SCRIPT '/some/script.sql';"); // When: - validator.validate(statements, props, "sql"); + validator.validate(serviceContext, statements, props, "sql"); // Then: verify(statementValidator, times(1)).validate( @@ -270,7 +270,7 @@ public void shouldThrowIfInvalidOverriddenProperty() { expectedException.expectMessage("Invalid config property: invalid.property"); // When: - validator.validate(statements, props, "sql"); + validator.validate(serviceContext, statements, props, "sql"); } @Test @@ -287,7 +287,7 @@ public void shouldValidateForValidOverriddenProperty() { ); // When: - validator.validate(statements, props, "sql"); + validator.validate(serviceContext, statements, props, "sql"); // Then: verify(statementValidator, times(1)).validate( @@ -302,13 +302,14 @@ public void shouldValidateForValidOverriddenProperty() { public void shouldThrowIfServiceContextIsNotSandbox() { // Given: serviceContext = mock(ServiceContext.class); + givenRequestValidator(ImmutableMap.of()); // Expect: expectedException.expect(IllegalArgumentException.class); expectedException.expectMessage("Expected sandbox"); // When: - givenRequestValidator(ImmutableMap.of()); + validator.validate(serviceContext, ImmutableList.of(), ImmutableMap.of(), "sql"); } @Test @@ -322,7 +323,7 @@ public void shouldThrowIfSnapshotSupplierReturnsNonSandbox() { expectedException.expectMessage("Expected sandbox"); // When: - validator.validate(ImmutableList.of(), ImmutableMap.of(), "sql"); + validator.validate(serviceContext, ImmutableList.of(), ImmutableMap.of(), "sql"); } private List givenParsed(final String sql) { @@ -336,7 +337,6 @@ private void givenRequestValidator( customValidators, (ec, sc) -> InjectorChain.of(schemaInjector, topicInjector), () -> executionContext, - serviceContext, ksqlConfig ); }