Skip to content

Commit

Permalink
fix: reject requests to new API server if server is not ready (#5048)
Browse files Browse the repository at this point in the history
  • Loading branch information
vcrfxia authored Apr 15, 2020
1 parent 8c0a92a commit d988722
Show file tree
Hide file tree
Showing 12 changed files with 191 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,18 @@ private static <T> void handlePortedOldApiRequest(final RoutingContext routingCo

private static void oldApiFailureHandler(final RoutingContext routingContext) {
final int statusCode = routingContext.statusCode();
final KsqlErrorMessage ksqlErrorMessage = new KsqlErrorMessage(
toErrorCode(statusCode),
routingContext.failure().getMessage());

final KsqlErrorMessage ksqlErrorMessage;
if (routingContext.failure() instanceof KsqlApiException) {
final KsqlApiException ksqlApiException = (KsqlApiException) routingContext.failure();
ksqlErrorMessage = new KsqlErrorMessage(
ksqlApiException.getErrorCode(),
ksqlApiException.getMessage());
} else {
ksqlErrorMessage = new KsqlErrorMessage(
toErrorCode(statusCode),
routingContext.failure().getMessage());
}
try {
final byte[] bytes = OBJECT_MAPPER.writeValueAsBytes(ksqlErrorMessage);
routingContext.response().setStatusCode(statusCode)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.common.collect.ImmutableList;
import io.confluent.ksql.api.auth.AuthenticationPlugin;
import io.confluent.ksql.api.spi.Endpoints;
import io.confluent.ksql.rest.server.state.ServerState;
import io.confluent.ksql.security.KsqlSecurityExtension;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.VertxCompletableFuture;
Expand Down Expand Up @@ -64,18 +65,21 @@ public class Server {
private final boolean proxyEnabled;
private final KsqlSecurityExtension securityExtension;
private final Optional<AuthenticationPlugin> authenticationPlugin;
private final ServerState serverState;
private WorkerExecutor workerExecutor;
private int jettyPort = -1;
private List<URI> listeners = new ArrayList<>();

public Server(final Vertx vertx, final ApiServerConfig config, final Endpoints endpoints,
final boolean proxyEnabled, final KsqlSecurityExtension securityExtension,
final Optional<AuthenticationPlugin> authenticationPlugin) {
final Optional<AuthenticationPlugin> authenticationPlugin,
final ServerState serverState) {
this.vertx = Objects.requireNonNull(vertx);
this.config = Objects.requireNonNull(config);
this.endpoints = Objects.requireNonNull(endpoints);
this.securityExtension = Objects.requireNonNull(securityExtension);
this.authenticationPlugin = Objects.requireNonNull(authenticationPlugin);
this.serverState = Objects.requireNonNull(serverState);
this.maxPushQueryCount = config.getInt(ApiServerConfig.MAX_PUSH_QUERIES);
this.proxyEnabled = proxyEnabled;
}
Expand Down Expand Up @@ -207,6 +211,10 @@ Optional<AuthenticationPlugin> getAuthenticationPlugin() {
return authenticationPlugin;
}

ServerState getServerState() {
return serverState;
}

public ApiServerConfig getConfig() {
return config;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.api.server;

import io.confluent.ksql.rest.entity.KsqlErrorMessage;
import io.confluent.ksql.rest.server.state.ServerState;
import io.vertx.core.Handler;
import io.vertx.ext.web.RoutingContext;
import java.util.Objects;
import java.util.Optional;
import javax.ws.rs.core.Response;
import org.eclipse.jetty.http.HttpStatus;

public class ServerStateHandler implements Handler<RoutingContext> {

private final ServerState serverState;

ServerStateHandler(final ServerState serverState) {
this.serverState = Objects.requireNonNull(serverState, "serverState");
}

@Override
public void handle(final RoutingContext routingContext) {
final Optional<Response> response = serverState.checkReady();
if (response.isPresent()) {
final KsqlErrorMessage errorMsg = (KsqlErrorMessage) response.get().getEntity();
routingContext.fail(
HttpStatus.SERVICE_UNAVAILABLE_503,
new KsqlApiException(errorMsg.getMessage(), errorMsg.getErrorCode())
);
} else {
routingContext.next();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ private Router setupRouter() {

setupAuthHandlers(router);

setUpServerStateHandlers(router);

router.route(HttpMethod.POST, "/query-stream")
.produces("application/vnd.ksqlapi.delimited.v1")
.produces("application/json")
Expand Down Expand Up @@ -273,6 +275,13 @@ private static AuthHandler basicAuthHandler(final Server server) {
return basicAuthHandler;
}

private void setUpServerStateHandlers(final Router router) {
// This will require special handling when removing the proxy server as only endpoints
// defined in this repo (rather than in custom plugins) should reject requests based
// on server state
routeToNonProxiedEndpoints(router, new ServerStateHandler(server.getServerState()));
}

private static void pauseHandler(final RoutingContext routingContext) {
// prevent auth handler from reading request body
routingContext.request().pause();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ void startApiServer(final KsqlConfig ksqlConfigWithPort) {
);
apiServerConfig = new ApiServerConfig(ksqlConfigWithPort.originals());
apiServer = new Server(vertx, apiServerConfig, endpoints, true, securityExtension,
authenticationPlugin);
authenticationPlugin, serverState);
apiServer.start();
log.info("KSQL New API Server started");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.confluent.ksql.api.server.Server;
import io.confluent.ksql.api.utils.InsertsResponse;
import io.confluent.ksql.api.utils.QueryResponse;
import io.confluent.ksql.rest.server.state.ServerState;
import io.confluent.ksql.security.KsqlAuthorizationProvider;
import io.confluent.ksql.security.KsqlSecurityExtension;
import io.confluent.ksql.security.KsqlUserContextProvider;
Expand Down Expand Up @@ -131,7 +132,8 @@ public Optional<KsqlUserContextProvider> getUserContextProvider() {
public void close() {
}
},
Optional.ofNullable(securityHandlerPlugin));
Optional.ofNullable(securityHandlerPlugin),
serverState);
server.start();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.confluent.ksql.api.utils.ListRowGenerator;
import io.confluent.ksql.api.utils.QueryResponse;
import io.confluent.ksql.api.utils.ReceiveStream;
import io.confluent.ksql.rest.server.state.ServerState;
import io.confluent.ksql.security.KsqlDefaultSecurityExtension;
import io.confluent.ksql.util.VertxCompletableFuture;
import io.vertx.core.Vertx;
Expand Down Expand Up @@ -69,15 +70,17 @@ public class BaseApiTest {
protected WebClient client;
protected Server server;
protected TestEndpoints testEndpoints;
protected ServerState serverState;

@Before
public void setUp() {

vertx = Vertx.vertx();
vertx.exceptionHandler(t -> log.error("Unhandled exception in Vert.x", t));

testEndpoints = new TestEndpoints();
ApiServerConfig serverConfig = createServerConfig();
serverState = new ServerState();
serverState.setReady();
createServer(serverConfig);
this.client = createClient();
setDefaultRowGenerator();
Expand Down Expand Up @@ -114,7 +117,7 @@ protected void stopClient() {

protected void createServer(ApiServerConfig serverConfig) {
server = new Server(vertx, serverConfig, testEndpoints, false,
new KsqlDefaultSecurityExtension(), Optional.empty());
new KsqlDefaultSecurityExtension(), Optional.empty(), serverState);
server.start();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.hamcrest.Matchers.nullValue;

import io.confluent.ksql.api.server.ApiServerConfig;
import io.confluent.ksql.rest.server.state.ServerState;
import io.confluent.ksql.util.VertxCompletableFuture;
import io.vertx.core.MultiMap;
import io.vertx.core.Vertx;
Expand Down Expand Up @@ -59,6 +60,8 @@ public void setUp() {
vertx.exceptionHandler(t -> log.error("Unhandled exception in Vert.x", t));

testEndpoints = new TestEndpoints();
serverState = new ServerState();
serverState.setReady();
setDefaultRowGenerator();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.api;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;

import io.confluent.ksql.api.utils.QueryResponse;
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.entity.KsqlErrorMessage;
import io.vertx.core.buffer.Buffer;
import io.vertx.ext.web.client.HttpResponse;
import org.junit.Test;

public class ServerStateTest extends BaseApiTest {

private static final int INITIALIZING_ERROR_CODE = 50333;
private static final String INITIALIZING_MESSAGE = "waiting for preconditions";

@Test
public void shouldReturn503IfInitializing() throws Exception {
// Given:
this.serverState.setInitializingReason(new KsqlErrorMessage(INITIALIZING_ERROR_CODE, INITIALIZING_MESSAGE));

// When/Then:
validateResponse(503, INITIALIZING_ERROR_CODE, INITIALIZING_MESSAGE);
}

@Test
public void shouldReturn503IfTerminating() throws Exception {
// Given:
this.serverState.setTerminating();

// When/Then:
validateResponse(503, Errors.ERROR_CODE_SERVER_SHUTTING_DOWN, "The server is shutting down");
}

private void validateResponse(
final int expectedStatus,
final int expectedErrorCode,
final String expectedMessage
) throws Exception {
// When:
HttpResponse<Buffer> response = sendRequest(
"/query-stream",
DEFAULT_PUSH_QUERY_REQUEST_BODY.toBuffer()
);

// Then
assertThat(response.statusCode(), is(expectedStatus));

QueryResponse queryResponse = new QueryResponse(response.bodyAsString());
validateError(expectedErrorCode, expectedMessage, queryResponse.responseObject);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.confluent.ksql.api.server.ApiServerConfig;
import io.confluent.ksql.api.server.Server;
import io.confluent.ksql.api.spi.Endpoints;
import io.confluent.ksql.rest.server.state.ServerState;
import io.confluent.ksql.security.KsqlDefaultSecurityExtension;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpVersion;
Expand Down Expand Up @@ -167,9 +168,10 @@ private void doRun() throws Throwable {
private void setUp() {
vertx = Vertx.vertx();
ApiServerConfig serverConfig = createServerConfig();
final ServerState serverState = new ServerState();
serverState.setReady();
server = new Server(vertx, serverConfig, endpoints, false, new KsqlDefaultSecurityExtension(),
Optional
.empty());
Optional.empty(), serverState);
server.start();
client = createClient();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import io.confluent.common.utils.IntegrationTest;
import io.confluent.ksql.integration.IntegrationTestHarness;
import io.confluent.ksql.integration.Retry;
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.entity.ClusterTerminateRequest;
import io.confluent.ksql.rest.entity.KsqlErrorMessage;
import io.confluent.ksql.rest.server.TestKsqlRestApp;
import io.confluent.ksql.util.KsqlConstants;
import io.confluent.ksql.util.PageViewDataProvider;
Expand Down Expand Up @@ -100,6 +102,19 @@ public void shouldCleanUpSinkTopicsAndSchemasDuringClusterTermination() throws E
TEST_HARNESS.topicExists(PAGE_VIEW_TOPIC),
is(true)
);

// Then:
shouldReturn50303WhenTerminating();
}

private void shouldReturn50303WhenTerminating() {
// Given: TERMINATE CLUSTER has been issued

// When:
final KsqlErrorMessage error = RestIntegrationTestUtil.makeKsqlRequestWithError(REST_APP, "SHOW STREAMS;");

// Then:
assertThat(error.getErrorCode(), is(Errors.ERROR_CODE_SERVER_SHUTTING_DOWN));
}

private static void terminateCluster(final List<String> deleteTopicList) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,20 @@ static List<KsqlEntity> makeKsqlRequest(
}
}

static KsqlErrorMessage makeKsqlRequestWithError(
final TestKsqlRestApp restApp,
final String sql
) {
try (final KsqlRestClient restClient = restApp.buildKsqlClient(Optional.empty())) {

final RestResponse<KsqlEntityList> res = restClient.makeKsqlRequest(sql);

throwOnNoError(res);

return res.getErrorMessage();
}
}

static List<StreamedRow> makeQueryRequest(
final TestKsqlRestApp restApp,
final String sql,
Expand Down

0 comments on commit d988722

Please sign in to comment.