diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/Server.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/Server.java index 4980f04f6f65..3b0028ac18a8 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/Server.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/Server.java @@ -70,7 +70,6 @@ public class Server { private final Map queries = new ConcurrentHashMap<>(); private final Set connections = new ConcurrentHashSet<>(); private final int maxPushQueryCount; - private final Set serverVerticles = new HashSet<>(); private final Set deploymentIds = new HashSet<>(); private final KsqlSecurityExtension securityExtension; private final Optional authenticationPlugin; @@ -122,7 +121,6 @@ public synchronized void start() { listener.getScheme().equalsIgnoreCase("https")), this, isInternalListener); vertx.deployVerticle(serverVerticle, vcf); - serverVerticles.add(serverVerticle); final int index = i; final CompletableFuture deployFuture = vcf.thenApply(s -> { if (index == 0) { @@ -140,7 +138,7 @@ public synchronized void start() { } } - configureTlsCertReload(config, serverVerticles); + configureTlsCertReload(config); final CompletableFuture allDeployFuture = CompletableFuture.allOf(deployFutures .toArray(new CompletableFuture[0])); @@ -184,6 +182,11 @@ public synchronized void stop() { log.info("API server stopped"); } + public void restart() { + stop(); + start(); + } + public WorkerExecutor getWorkerExecutor() { return workerExecutor; } @@ -245,6 +248,28 @@ public synchronized Optional getInternalListener() { return Optional.ofNullable(internalListener); } + private void configureTlsCertReload(final KsqlRestConfig config) { + if (config.getBoolean(KsqlRestConfig.SSL_KEYSTORE_RELOAD_CONFIG)) { + final Path watchLocation; + if (!config.getString(KsqlRestConfig.SSL_KEYSTORE_WATCH_LOCATION_CONFIG).isEmpty()) { + watchLocation = Paths.get( + config.getString(KsqlRestConfig.SSL_KEYSTORE_WATCH_LOCATION_CONFIG)); + } else { + watchLocation = Paths.get(config.getString(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG)); + } + + try { + FileWatcher.onFileChange( + watchLocation, + this::restart + ); + log.info("Enabled SSL cert auto reload for: " + watchLocation); + } catch (java.io.IOException e) { + log.error("Can not enabled SSL cert auto reload", e); + } + } + } + private static HttpServerOptions createHttpServerOptions(final KsqlRestConfig ksqlRestConfig, final String host, final int port, final boolean tls) { @@ -344,29 +369,4 @@ private static List parseListenerStrings( } return listeners; } - - private static void configureTlsCertReload( - final KsqlRestConfig config, - final Set serverVerticles - ) { - if (config.getBoolean(KsqlRestConfig.SSL_KEYSTORE_RELOAD_CONFIG)) { - final Path watchLocation; - if (!config.getString(KsqlRestConfig.SSL_KEYSTORE_WATCH_LOCATION_CONFIG).isEmpty()) { - watchLocation = Paths.get( - config.getString(KsqlRestConfig.SSL_KEYSTORE_WATCH_LOCATION_CONFIG)); - } else { - watchLocation = Paths.get(config.getString(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG)); - } - - try { - FileWatcher.onFileChange( - watchLocation, - () -> serverVerticles.forEach(ServerVerticle::restartServer) - ); - log.info("Enabled SSL cert auto reload for: " + watchLocation); - } catch (java.io.IOException e) { - log.error("Can not enabled SSL cert auto reload", e); - } - } - } } diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/ServerVerticle.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/ServerVerticle.java index 3126a7eb6635..14c8c6983112 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/ServerVerticle.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/ServerVerticle.java @@ -62,7 +62,6 @@ public class ServerVerticle extends AbstractVerticle { private final HttpServerOptions httpServerOptions; private final Server server; private ConnectionQueryManager connectionQueryManager; - private Router router; private HttpServer httpServer; private final Optional isInternalListener; @@ -80,8 +79,8 @@ public ServerVerticle( @Override public void start(final Promise startPromise) { this.connectionQueryManager = new ConnectionQueryManager(context, server); - router = setupRouter(); - httpServer = createHttpServer(); + httpServer = vertx.createHttpServer(httpServerOptions).requestHandler(setupRouter()) + .exceptionHandler(ServerVerticle::unhandledExceptionHandler); httpServer.listen(ar -> { if (ar.succeeded()) { startPromise.complete(); @@ -100,39 +99,10 @@ public void stop(final Promise stopPromise) { } } - // Creates a new server, rather than simply stopping and restarting, in order to reload TLS certs - public void restartServer() { - final String host = httpServerOptions.getHost(); - final int port = httpServer.actualPort(); - - log.info("Restarting server"); - httpServer.close(closeAR -> { - if (closeAR.succeeded()) { - log.info("Server has closed"); - } else { - log.error("Error closing server during restart", closeAR.cause()); - } - - httpServer = createHttpServer(); - httpServer.listen(port, host, startAR -> { - if (startAR.succeeded()) { - log.info("Server has restarted"); - } else { - log.error("Error starting server during restart", startAR.cause()); - } - }); - }); - } - int actualPort() { return httpServer.actualPort(); } - private HttpServer createHttpServer() { - return vertx.createHttpServer(httpServerOptions).requestHandler(router) - .exceptionHandler(ServerVerticle::unhandledExceptionHandler); - } - private Router setupRouter() { final Router router = Router.router(vertx);