Skip to content

Commit

Permalink
chore: move restart logic from ServerVerticle to Server
Browse files Browse the repository at this point in the history
  • Loading branch information
vcrfxia committed Jun 1, 2020
1 parent 0ffaf29 commit 612b215
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ public class Server {
private final Map<PushQueryId, PushQueryHolder> queries = new ConcurrentHashMap<>();
private final Set<HttpConnection> connections = new ConcurrentHashSet<>();
private final int maxPushQueryCount;
private final Set<ServerVerticle> serverVerticles = new HashSet<>();
private final Set<String> deploymentIds = new HashSet<>();
private final KsqlSecurityExtension securityExtension;
private final Optional<AuthenticationPlugin> authenticationPlugin;
Expand Down Expand Up @@ -122,7 +121,6 @@ public synchronized void start() {
listener.getScheme().equalsIgnoreCase("https")),
this, isInternalListener);
vertx.deployVerticle(serverVerticle, vcf);
serverVerticles.add(serverVerticle);
final int index = i;
final CompletableFuture<String> deployFuture = vcf.thenApply(s -> {
if (index == 0) {
Expand All @@ -140,7 +138,7 @@ public synchronized void start() {
}
}

configureTlsCertReload(config, serverVerticles);
configureTlsCertReload(config);

final CompletableFuture<Void> allDeployFuture = CompletableFuture.allOf(deployFutures
.toArray(new CompletableFuture<?>[0]));
Expand Down Expand Up @@ -184,6 +182,11 @@ public synchronized void stop() {
log.info("API server stopped");
}

public void restart() {
stop();
start();
}

public WorkerExecutor getWorkerExecutor() {
return workerExecutor;
}
Expand Down Expand Up @@ -245,6 +248,28 @@ public synchronized Optional<URI> getInternalListener() {
return Optional.ofNullable(internalListener);
}

private void configureTlsCertReload(final KsqlRestConfig config) {
if (config.getBoolean(KsqlRestConfig.SSL_KEYSTORE_RELOAD_CONFIG)) {
final Path watchLocation;
if (!config.getString(KsqlRestConfig.SSL_KEYSTORE_WATCH_LOCATION_CONFIG).isEmpty()) {
watchLocation = Paths.get(
config.getString(KsqlRestConfig.SSL_KEYSTORE_WATCH_LOCATION_CONFIG));
} else {
watchLocation = Paths.get(config.getString(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG));
}

try {
FileWatcher.onFileChange(
watchLocation,
this::restart
);
log.info("Enabled SSL cert auto reload for: " + watchLocation);
} catch (java.io.IOException e) {
log.error("Can not enabled SSL cert auto reload", e);
}
}
}

private static HttpServerOptions createHttpServerOptions(final KsqlRestConfig ksqlRestConfig,
final String host, final int port, final boolean tls) {

Expand Down Expand Up @@ -344,29 +369,4 @@ private static List<URI> parseListenerStrings(
}
return listeners;
}

private static void configureTlsCertReload(
final KsqlRestConfig config,
final Set<ServerVerticle> serverVerticles
) {
if (config.getBoolean(KsqlRestConfig.SSL_KEYSTORE_RELOAD_CONFIG)) {
final Path watchLocation;
if (!config.getString(KsqlRestConfig.SSL_KEYSTORE_WATCH_LOCATION_CONFIG).isEmpty()) {
watchLocation = Paths.get(
config.getString(KsqlRestConfig.SSL_KEYSTORE_WATCH_LOCATION_CONFIG));
} else {
watchLocation = Paths.get(config.getString(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG));
}

try {
FileWatcher.onFileChange(
watchLocation,
() -> serverVerticles.forEach(ServerVerticle::restartServer)
);
log.info("Enabled SSL cert auto reload for: " + watchLocation);
} catch (java.io.IOException e) {
log.error("Can not enabled SSL cert auto reload", e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ public class ServerVerticle extends AbstractVerticle {
private final HttpServerOptions httpServerOptions;
private final Server server;
private ConnectionQueryManager connectionQueryManager;
private Router router;
private HttpServer httpServer;
private final Optional<Boolean> isInternalListener;

Expand All @@ -80,8 +79,8 @@ public ServerVerticle(
@Override
public void start(final Promise<Void> startPromise) {
this.connectionQueryManager = new ConnectionQueryManager(context, server);
router = setupRouter();
httpServer = createHttpServer();
httpServer = vertx.createHttpServer(httpServerOptions).requestHandler(setupRouter())
.exceptionHandler(ServerVerticle::unhandledExceptionHandler);
httpServer.listen(ar -> {
if (ar.succeeded()) {
startPromise.complete();
Expand All @@ -100,39 +99,10 @@ public void stop(final Promise<Void> stopPromise) {
}
}

// Creates a new server, rather than simply stopping and restarting, in order to reload TLS certs
public void restartServer() {
final String host = httpServerOptions.getHost();
final int port = httpServer.actualPort();

log.info("Restarting server");
httpServer.close(closeAR -> {
if (closeAR.succeeded()) {
log.info("Server has closed");
} else {
log.error("Error closing server during restart", closeAR.cause());
}

httpServer = createHttpServer();
httpServer.listen(port, host, startAR -> {
if (startAR.succeeded()) {
log.info("Server has restarted");
} else {
log.error("Error starting server during restart", startAR.cause());
}
});
});
}

int actualPort() {
return httpServer.actualPort();
}

private HttpServer createHttpServer() {
return vertx.createHttpServer(httpServerOptions).requestHandler(router)
.exceptionHandler(ServerVerticle::unhandledExceptionHandler);
}

private Router setupRouter() {
final Router router = Router.router(vertx);

Expand Down

0 comments on commit 612b215

Please sign in to comment.