diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/properties/PropertiesUtil.java b/ksqldb-common/src/main/java/io/confluent/ksql/properties/PropertiesUtil.java index 7fc92aa6aa7b..2e54c27b1730 100644 --- a/ksqldb-common/src/main/java/io/confluent/ksql/properties/PropertiesUtil.java +++ b/ksqldb-common/src/main/java/io/confluent/ksql/properties/PropertiesUtil.java @@ -15,6 +15,7 @@ package io.confluent.ksql.properties; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap.Builder; import com.google.common.collect.ImmutableSet; @@ -76,9 +77,24 @@ public static Properties asProperties(final Map mapProps) { * @return an immutable map of the loaded properties. */ public static Map loadProperties(final File propertiesFile) { - final Map properties = loadPropsFromFile(propertiesFile); + return loadProperties(ImmutableList.of(propertiesFile)); + } + + /** + * Load a list of property files. Properties are loaded from the first entry + * in the list to the last, meaning that any properties specified in later + * files take precedence. + * + * @param propertiesFiles the property files to load. + * @return an immutable map of the loaded properties. + */ + public static Map loadProperties(final List propertiesFiles) { + final Map properties = new HashMap<>(); + for (final File propertiesFile : propertiesFiles) { + properties.putAll(loadPropsFromFile(propertiesFile)); + } throwOnBlackListedProperties(properties); - return properties; + return ImmutableMap.copyOf(properties); } /** diff --git a/ksqldb-common/src/test/java/io/confluent/ksql/properties/PropertiesUtilTest.java b/ksqldb-common/src/test/java/io/confluent/ksql/properties/PropertiesUtilTest.java index b27804881741..2fb3eacdcda7 100644 --- a/ksqldb-common/src/test/java/io/confluent/ksql/properties/PropertiesUtilTest.java +++ b/ksqldb-common/src/test/java/io/confluent/ksql/properties/PropertiesUtilTest.java @@ -26,10 +26,12 @@ import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertThrows; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import io.confluent.ksql.test.util.KsqlTestFolder; import io.confluent.ksql.util.KsqlException; import java.io.File; +import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.util.Map; @@ -69,6 +71,28 @@ public void shouldLoadPropsFromFile() { assertThat(result.get("some.other.prop"), is("124")); } + @Test + public void shouldLoadPropsFromMultipleFiles() throws IOException { + // Given: + givenPropsFileContains( + "# Comment" + System.lineSeparator() + + "some.prop=some value" + System.lineSeparator() + + "some.other.prop=124" + System.lineSeparator() + ); + File other = TMP.newFile(); + givenPropsFileContains(other, "some.prop=other value" + System.lineSeparator() + "a=b"); + + // When: + final Map result = PropertiesUtil.loadProperties( + ImmutableList.of(propsFile, other) + ); + + // Then: + assertThat(result.get("some.prop"), is("other value")); + assertThat(result.get("some.other.prop"), is("124")); + assertThat(result.get("a"), is("b")); + } + @Test public void shouldLoadImmutablePropsFromFile() { // Given: @@ -217,6 +241,10 @@ public void shouldNotThrowOnUnkownPropertyFromCoerceTypesWithIgnore() { } private void givenPropsFileContains(final String contents) { + givenPropsFileContains(propsFile, contents); + } + + private void givenPropsFileContains(final File propsFile, final String contents) { try { Files.write(propsFile.toPath(), contents.getBytes(StandardCharsets.UTF_8)); } catch (final Exception e) { diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/LoggingRateLimiter.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/LoggingRateLimiter.java index 35c5ad138d88..f60c6dbbd1c8 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/LoggingRateLimiter.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/LoggingRateLimiter.java @@ -28,7 +28,7 @@ import java.util.function.Function; import org.slf4j.Logger; -class LoggingRateLimiter { +public class LoggingRateLimiter { // Print "You hit a rate limit" every 5 seconds private static final double LIMIT_HIT_LOG_RATE = 0.2; diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/PreconditionVerticle.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/PreconditionVerticle.java new file mode 100644 index 000000000000..3b9129817ee8 --- /dev/null +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/PreconditionVerticle.java @@ -0,0 +1,78 @@ +/* + * Copyright 2022 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.api.server; + + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import io.confluent.ksql.api.util.ApiServerUtils; +import io.confluent.ksql.rest.server.state.ServerState; +import io.vertx.core.AbstractVerticle; +import io.vertx.core.Promise; +import io.vertx.core.http.HttpMethod; +import io.vertx.core.http.HttpServer; +import io.vertx.core.http.HttpServerOptions; +import io.vertx.ext.web.Router; +import java.util.Objects; + +/** + * The server deploys multiple server verticles. This is where the HTTP2 requests are handled. The + * actual implementation of the endpoints is provided by an implementation of {@code Endpoints}. + */ +public class PreconditionVerticle extends AbstractVerticle { + private final HttpServerOptions httpServerOptions; + private final ServerState serverState; + private HttpServer httpServer; + + @SuppressFBWarnings(value = "EI_EXPOSE_REP2") + public PreconditionVerticle( + final HttpServerOptions httpServerOptions, + final ServerState serverState + ) { + this.httpServerOptions = Objects.requireNonNull(httpServerOptions); + this.serverState = Objects.requireNonNull(serverState, "serverState"); + } + + @Override + public void start(final Promise startPromise) { + httpServer = vertx.createHttpServer(httpServerOptions).requestHandler(setupRouter()) + .exceptionHandler(ApiServerUtils::unhandledExceptionHandler); + httpServer.listen(ar -> { + if (ar.succeeded()) { + startPromise.complete(); + } else { + startPromise.fail(ar.cause()); + } + }); + } + + @Override + public void stop(final Promise stopPromise) { + if (httpServer == null) { + stopPromise.complete(); + } else { + httpServer.close(stopPromise.future()); + } + } + + private Router setupRouter() { + final Router router = Router.router(vertx); + router.route(HttpMethod.GET, "/chc/ready").handler(ApiServerUtils::chcHandler); + router.route(HttpMethod.GET, "/chc/live").handler(ApiServerUtils::chcHandler); + router.route().handler(new ServerStateHandler(serverState)); + router.route().failureHandler(new FailureHandler()); + return router; + } +} diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/Server.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/Server.java index a8a63dc4b461..09c7c469e9fc 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/Server.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/Server.java @@ -1,5 +1,5 @@ /* - * Copyright 2020 Confluent Inc. + * Copyright 2022 Confluent Inc. * * Licensed under the Confluent Community License (the "License"); you may not use * this file except in compliance with the License. You may obtain a copy of the @@ -21,8 +21,8 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.confluent.ksql.api.auth.AuthenticationPlugin; import io.confluent.ksql.api.spi.Endpoints; +import io.confluent.ksql.api.util.ApiServerUtils; import io.confluent.ksql.internal.PullQueryExecutorMetrics; -import io.confluent.ksql.properties.PropertiesUtil; import io.confluent.ksql.rest.entity.PushQueryId; import io.confluent.ksql.rest.server.KsqlRestConfig; import io.confluent.ksql.rest.server.state.ServerState; @@ -30,7 +30,6 @@ import io.confluent.ksql.util.FileWatcher; import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.VertxCompletableFuture; -import io.confluent.ksql.util.VertxSslOptionsFactory; import io.netty.handler.ssl.OpenSsl; import io.vertx.core.Vertx; import io.vertx.core.WorkerExecutor; @@ -38,8 +37,6 @@ import io.vertx.core.http.HttpConnection; import io.vertx.core.http.HttpServerOptions; import io.vertx.core.impl.ConcurrentHashSet; -import io.vertx.core.net.JksOptions; -import io.vertx.core.net.PfxOptions; import java.net.URI; import java.net.URISyntaxException; import java.nio.file.Path; @@ -54,7 +51,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; -import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.SslConfigs; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -118,7 +114,7 @@ public synchronized void start() { final LoggingRateLimiter loggingRateLimiter = new LoggingRateLimiter(config); configureTlsCertReload(config); - final List listenUris = parseListeners(config); + final List listenUris = ApiServerUtils.parseListeners(config); final Optional internalListenUri = parseInternalListener(config, listenUris); final List allListenUris = new ArrayList<>(listenUris); internalListenUri.ifPresent(allListenUris::add); @@ -311,91 +307,11 @@ private static HttpServerOptions createHttpServerOptions(final KsqlRestConfig ks : ksqlRestConfig.getClientAuth(); final String alias = ksqlRestConfig.getString(ksConfigName); - setTlsOptions(ksqlRestConfig, options, alias, clientAuth); + ApiServerUtils.setTlsOptions(ksqlRestConfig, options, alias, clientAuth); } return options; } - private static void setTlsOptions( - final KsqlRestConfig ksqlRestConfig, - final HttpServerOptions options, - final String keyStoreAlias, - final ClientAuth clientAuth - ) { - options.setUseAlpn(true).setSsl(true); - if (ksqlRestConfig.getBoolean(KsqlRestConfig.KSQL_SERVER_SNI_CHECK_ENABLE)) { - options.setSni(true); - } - - configureTlsKeyStore(ksqlRestConfig, options, keyStoreAlias); - configureTlsTrustStore(ksqlRestConfig, options); - - final List enabledProtocols = - ksqlRestConfig.getList(KsqlRestConfig.SSL_ENABLED_PROTOCOLS_CONFIG); - if (!enabledProtocols.isEmpty()) { - options.setEnabledSecureTransportProtocols(new HashSet<>(enabledProtocols)); - } - - final List cipherSuites = - ksqlRestConfig.getList(KsqlRestConfig.SSL_CIPHER_SUITES_CONFIG); - if (!cipherSuites.isEmpty()) { - // Vert.x does not yet support a method for setting cipher suites, so we use the following - // workaround instead. See https://github.com/eclipse-vertx/vert.x/issues/1507. - final Set enabledCipherSuites = options.getEnabledCipherSuites(); - enabledCipherSuites.clear(); - enabledCipherSuites.addAll(cipherSuites); - } - - options.setClientAuth(clientAuth); - } - - private static void configureTlsKeyStore( - final KsqlRestConfig ksqlRestConfig, - final HttpServerOptions httpServerOptions, - final String keyStoreAlias - ) { - final Map props = PropertiesUtil.toMapStrings(ksqlRestConfig.originals()); - final String keyStoreType = ksqlRestConfig.getString(KsqlRestConfig.SSL_KEYSTORE_TYPE_CONFIG); - - if (keyStoreType.equals(KsqlRestConfig.SSL_STORE_TYPE_JKS)) { - final Optional keyStoreOptions = - VertxSslOptionsFactory.buildJksKeyStoreOptions(props, Optional.ofNullable(keyStoreAlias)); - - keyStoreOptions.ifPresent(options -> httpServerOptions.setKeyStoreOptions(options)); - } else if (keyStoreType.equals(KsqlRestConfig.SSL_STORE_TYPE_PKCS12)) { - final Optional keyStoreOptions = - VertxSslOptionsFactory.getPfxKeyStoreOptions(props); - - keyStoreOptions.ifPresent(options -> httpServerOptions.setPfxKeyCertOptions(options)); - } - } - - private static void configureTlsTrustStore( - final KsqlRestConfig ksqlRestConfig, - final HttpServerOptions httpServerOptions - ) { - final Map props = PropertiesUtil.toMapStrings(ksqlRestConfig.originals()); - final String trustStoreType = - ksqlRestConfig.getString(KsqlRestConfig.SSL_TRUSTSTORE_TYPE_CONFIG); - - if (trustStoreType.equals(KsqlRestConfig.SSL_STORE_TYPE_JKS)) { - final Optional trustStoreOptions = - VertxSslOptionsFactory.getJksTrustStoreOptions(props); - - trustStoreOptions.ifPresent(options -> httpServerOptions.setTrustOptions(options)); - } else if (trustStoreType.equals(KsqlRestConfig.SSL_STORE_TYPE_PKCS12)) { - final Optional trustStoreOptions = - VertxSslOptionsFactory.getPfxTrustStoreOptions(props); - - trustStoreOptions.ifPresent(options -> httpServerOptions.setTrustOptions(options)); - } - } - - private static List parseListeners(final KsqlRestConfig config) { - final List sListeners = config.getList(KsqlRestConfig.LISTENERS_CONFIG); - return parseListenerStrings(config, sListeners); - } - private static Optional parseInternalListener( final KsqlRestConfig config, final List listenUris @@ -403,7 +319,7 @@ private static Optional parseInternalListener( if (config.getString(KsqlRestConfig.INTERNAL_LISTENER_CONFIG) == null) { return Optional.empty(); } - final URI uri = parseListenerStrings(config, + final URI uri = ApiServerUtils.parseListenerStrings(config, ImmutableList.of(config.getString(KsqlRestConfig.INTERNAL_LISTENER_CONFIG))).get(0); if (listenUris.contains(uri)) { return Optional.empty(); @@ -411,30 +327,4 @@ private static Optional parseInternalListener( return Optional.of(uri); } } - - private static List parseListenerStrings( - final KsqlRestConfig config, - final List stringListeners) { - final List listeners = new ArrayList<>(); - for (String listenerName : stringListeners) { - try { - final URI uri = new URI(listenerName); - final String scheme = uri.getScheme(); - if (!"http".equalsIgnoreCase(scheme) && !"https".equalsIgnoreCase(scheme)) { - throw new ConfigException("Invalid URI scheme should be http or https: " + listenerName); - } - if ("https".equalsIgnoreCase(scheme)) { - final String keyStoreLocation = config - .getString(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG); - if (keyStoreLocation == null || keyStoreLocation.isEmpty()) { - throw new ConfigException("https listener specified but no keystore provided"); - } - } - listeners.add(uri); - } catch (URISyntaxException e) { - throw new ConfigException("Invalid listener URI: " + listenerName); - } - } - return listeners; - } } diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/util/ApiServerUtils.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/util/ApiServerUtils.java new file mode 100644 index 000000000000..d8b1a765a417 --- /dev/null +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/util/ApiServerUtils.java @@ -0,0 +1,197 @@ +/* + * Copyright 2022 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.api.util; + +import io.confluent.ksql.properties.PropertiesUtil; +import io.confluent.ksql.rest.server.KsqlRestConfig; +import io.confluent.ksql.util.FileWatcher; +import io.confluent.ksql.util.FileWatcher.Callback; +import io.confluent.ksql.util.VertxSslOptionsFactory; +import io.vertx.core.http.ClientAuth; +import io.vertx.core.http.HttpHeaders; +import io.vertx.core.http.HttpServerOptions; +import io.vertx.core.json.JsonObject; +import io.vertx.core.net.JksOptions; +import io.vertx.core.net.PfxOptions; +import io.vertx.ext.web.RoutingContext; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.channels.ClosedChannelException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.config.SslConfigs; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class ApiServerUtils { + private static final Logger LOG = LoggerFactory.getLogger(ApiServerUtils.class); + + private ApiServerUtils() { + } + + + public static void unhandledExceptionHandler(final Throwable t) { + if (t instanceof ClosedChannelException) { + LOG.debug("Unhandled ClosedChannelException (connection likely closed early)", t); + } else { + LOG.error("Unhandled exception", t); + } + } + + public static void chcHandler(final RoutingContext routingContext) { + routingContext.response().putHeader(HttpHeaders.CONTENT_TYPE.toString(), "application/json") + .end(new JsonObject().toBuffer()); + } + + public static FileWatcher configureTlsCertReload( + final KsqlRestConfig config, + final Callback onChange + ) { + if (config.getBoolean(KsqlRestConfig.SSL_KEYSTORE_RELOAD_CONFIG)) { + final Path watchLocation; + if (!config.getString(KsqlRestConfig.SSL_KEYSTORE_WATCH_LOCATION_CONFIG).isEmpty()) { + watchLocation = Paths.get( + config.getString(KsqlRestConfig.SSL_KEYSTORE_WATCH_LOCATION_CONFIG)); + } else { + watchLocation = Paths.get(config.getString(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG)); + } + + FileWatcher fileWatcher = null; + try { + fileWatcher = new FileWatcher(watchLocation, onChange); + fileWatcher.start(); + LOG.info("Enabled SSL cert auto reload for: " + watchLocation); + } catch (java.io.IOException e) { + LOG.error("Failed to enable SSL cert auto reload", e); + } + return fileWatcher; + } + return null; + } + + public static List parseListenerStrings( + final KsqlRestConfig config, + final List stringListeners) { + final List listeners = new ArrayList<>(); + for (String listenerName : stringListeners) { + try { + final URI uri = new URI(listenerName); + final String scheme = uri.getScheme(); + if (!"http".equalsIgnoreCase(scheme) && !"https".equalsIgnoreCase(scheme)) { + throw new ConfigException("Invalid URI scheme should be http or https: " + listenerName); + } + if ("https".equalsIgnoreCase(scheme)) { + final String keyStoreLocation = config + .getString(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG); + if (keyStoreLocation == null || keyStoreLocation.isEmpty()) { + throw new ConfigException("https listener specified but no keystore provided"); + } + } + listeners.add(uri); + } catch (URISyntaxException e) { + throw new ConfigException("Invalid listener URI: " + listenerName); + } + } + return listeners; + } + + public static List parseListeners(final KsqlRestConfig config) { + final List sListeners = config.getList(KsqlRestConfig.LISTENERS_CONFIG); + return parseListenerStrings(config, sListeners); + } + + public static void setTlsOptions( + final KsqlRestConfig ksqlRestConfig, + final HttpServerOptions options, + final String keyStoreAlias, + final ClientAuth clientAuth + ) { + options.setUseAlpn(true).setSsl(true); + if (ksqlRestConfig.getBoolean(KsqlRestConfig.KSQL_SERVER_SNI_CHECK_ENABLE)) { + options.setSni(true); + } + + configureTlsKeyStore(ksqlRestConfig, options, keyStoreAlias); + configureTlsTrustStore(ksqlRestConfig, options); + + final List enabledProtocols = + ksqlRestConfig.getList(KsqlRestConfig.SSL_ENABLED_PROTOCOLS_CONFIG); + if (!enabledProtocols.isEmpty()) { + options.setEnabledSecureTransportProtocols(new HashSet<>(enabledProtocols)); + } + + final List cipherSuites = + ksqlRestConfig.getList(KsqlRestConfig.SSL_CIPHER_SUITES_CONFIG); + if (!cipherSuites.isEmpty()) { + // Vert.x does not yet support a method for setting cipher suites, so we use the following + // workaround instead. See https://github.com/eclipse-vertx/vert.x/issues/1507. + final Set enabledCipherSuites = options.getEnabledCipherSuites(); + enabledCipherSuites.clear(); + enabledCipherSuites.addAll(cipherSuites); + } + + options.setClientAuth(clientAuth); + } + + private static void configureTlsKeyStore( + final KsqlRestConfig ksqlRestConfig, + final HttpServerOptions httpServerOptions, + final String keyStoreAlias + ) { + final Map props = PropertiesUtil.toMapStrings(ksqlRestConfig.originals()); + final String keyStoreType = ksqlRestConfig.getString(KsqlRestConfig.SSL_KEYSTORE_TYPE_CONFIG); + + if (keyStoreType.equals(KsqlRestConfig.SSL_STORE_TYPE_JKS)) { + final Optional keyStoreOptions = + VertxSslOptionsFactory.buildJksKeyStoreOptions(props, Optional.ofNullable(keyStoreAlias)); + + keyStoreOptions.ifPresent(options -> httpServerOptions.setKeyStoreOptions(options)); + } else if (keyStoreType.equals(KsqlRestConfig.SSL_STORE_TYPE_PKCS12)) { + final Optional keyStoreOptions = + VertxSslOptionsFactory.getPfxKeyStoreOptions(props); + + keyStoreOptions.ifPresent(options -> httpServerOptions.setPfxKeyCertOptions(options)); + } + } + + private static void configureTlsTrustStore( + final KsqlRestConfig ksqlRestConfig, + final HttpServerOptions httpServerOptions + ) { + final Map props = PropertiesUtil.toMapStrings(ksqlRestConfig.originals()); + final String trustStoreType = + ksqlRestConfig.getString(KsqlRestConfig.SSL_TRUSTSTORE_TYPE_CONFIG); + + if (trustStoreType.equals(KsqlRestConfig.SSL_STORE_TYPE_JKS)) { + final Optional trustStoreOptions = + VertxSslOptionsFactory.getJksTrustStoreOptions(props); + + trustStoreOptions.ifPresent(options -> httpServerOptions.setTrustOptions(options)); + } else if (trustStoreType.equals(KsqlRestConfig.SSL_STORE_TYPE_PKCS12)) { + final Optional trustStoreOptions = + VertxSslOptionsFactory.getPfxTrustStoreOptions(props); + + trustStoreOptions.ifPresent(options -> httpServerOptions.setTrustOptions(options)); + } + } +} diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java index 0c0fd74394e4..a4389b1e0b2c 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java @@ -60,7 +60,6 @@ import io.confluent.ksql.rest.client.KsqlClient; import io.confluent.ksql.rest.client.RestResponse; import io.confluent.ksql.rest.entity.KsqlEntityList; -import io.confluent.ksql.rest.entity.KsqlErrorMessage; import io.confluent.ksql.rest.entity.SourceInfo; import io.confluent.ksql.rest.entity.StreamsList; import io.confluent.ksql.rest.server.HeartbeatAgent.Builder; @@ -115,7 +114,6 @@ import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.KsqlServerException; import io.confluent.ksql.util.ReservedInternalTopics; -import io.confluent.ksql.util.RetryUtil; import io.confluent.ksql.util.WelcomeMsgUtils; import io.confluent.ksql.utilization.PersistentQuerySaturationMetrics; import io.confluent.ksql.version.metrics.KsqlVersionCheckerAgent; @@ -150,7 +148,6 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Function; -import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; @@ -187,7 +184,6 @@ public final class KsqlRestApplication implements Executable { private final Optional authenticationPlugin; private final ServerState serverState; private final ProcessingLogContext processingLogContext; - private final List preconditions; private final List configurables; private final Consumer rocksDBConfigSetterHandler; private final Optional heartbeatAgent; @@ -238,7 +234,6 @@ public static SourceName getCommandsStreamName() { final Optional authenticationPlugin, final ServerState serverState, final ProcessingLogContext processingLogContext, - final List preconditions, final List configurables, final Consumer rocksDBConfigSetterHandler, final Optional heartbeatAgent, @@ -265,7 +260,6 @@ public static SourceName getCommandsStreamName() { this.commandStore = requireNonNull(commandStore, "commandStore"); this.serverState = requireNonNull(serverState, "serverState"); this.processingLogContext = requireNonNull(processingLogContext, "processingLogContext"); - this.preconditions = requireNonNull(preconditions, "preconditions"); this.versionCheckerAgent = requireNonNull(versionCheckerAgent, "versionCheckerAgent"); this.ksqlSecurityContextProvider = requireNonNull(ksqlSecurityContextProvider, "ksqlSecurityContextProvider"); @@ -390,7 +384,6 @@ public void startAsync() { @VisibleForTesting void startKsql(final KsqlConfig ksqlConfigWithPort) { - waitForPreconditions(); cleanupOldState(); initialize(ksqlConfigWithPort); } @@ -414,43 +407,6 @@ private AbortApplicationStartException(final String message) { } } - private void checkPreconditions() { - for (final KsqlServerPrecondition precondition : preconditions) { - final Optional error = precondition.checkPrecondition( - restConfig, - serviceContext, - internalTopicClient - ); - if (error.isPresent()) { - serverState.setInitializingReason(error.get()); - throw new KsqlFailedPrecondition(error.get().toString()); - } - } - } - - private void waitForPreconditions() { - final List> predicates = ImmutableList.of( - e -> !(e instanceof KsqlFailedPrecondition) - ); - try { - RetryUtil.retryWithBackoff( - Integer.MAX_VALUE, - 1000, - 30000, - this::checkPreconditions, - terminatedFuture::isDone, - predicates - ); - } catch (KsqlFailedPrecondition e) { - log.error("Failed to meet preconditions. Exiting...", e); - } - - if (terminatedFuture.isDone()) { - throw new AbortApplicationStartException( - "Shutting down application during waitForPreconditions"); - } - } - private void cleanupOldState() { localCommands.ifPresent(lc -> lc.processLocalCommandFiles(serviceContext)); } @@ -627,6 +583,7 @@ Optional getInternalListener() { public static KsqlRestApplication buildApplication( final KsqlRestConfig restConfig, + final ServerState serverState, final MetricCollectors metricCollectors) { final Map updatedRestProps = restConfig.getOriginals(); @@ -669,6 +626,7 @@ public static KsqlRestApplication buildApplication( return buildApplication( "", updatedRestConfig, + serverState, KsqlVersionCheckerAgent::new, Integer.MAX_VALUE, serviceContext, @@ -687,6 +645,7 @@ public static KsqlRestApplication buildApplication( static KsqlRestApplication buildApplication( final String metricsPrefix, final KsqlRestConfig restConfig, + final ServerState serverState, final Function, VersionCheckerAgent> versionCheckerFactory, final int maxStatementRetries, final ServiceContext serviceContext, @@ -821,8 +780,6 @@ static KsqlRestApplication buildApplication( final VersionCheckerAgent versionChecker = versionCheckerFactory.apply(ksqlEngine::hasActiveQueries); - final ServerState serverState = new ServerState(); - final KsqlSecurityExtension securityExtension = loadSecurityExtension(ksqlConfig); final KsqlSecurityContextProvider ksqlSecurityContextProvider = @@ -968,11 +925,6 @@ static KsqlRestApplication buildApplication( denyListPropertyValidator ); - final List preconditions = restConfig.getConfiguredInstances( - KsqlRestConfig.KSQL_SERVER_PRECONDITIONS, - KsqlServerPrecondition.class - ); - final List configurables = ImmutableList.of( ksqlResource, ksqlEngine @@ -997,7 +949,6 @@ static KsqlRestApplication buildApplication( securityHandlerPlugin, serverState, processingLogContext, - preconditions, configurables, rocksDBConfigSetterHandler, heartbeatAgent, diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java index 30be10da916a..6a13b2ca3995 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java @@ -214,7 +214,7 @@ public class KsqlRestConfig extends AbstractConfig { private static final String KSQL_WEBSOCKETS_NUM_THREADS_DOC = "The number of websocket threads to handle query results"; - static final String KSQL_SERVER_PRECONDITIONS = + public static final String KSQL_SERVER_PRECONDITIONS = KSQL_CONFIG_PREFIX + "server.preconditions"; private static final String KSQL_SERVER_PRECONDITIONS_DOC = "A comma separated list of classes implementing KsqlServerPrecondition. The KSQL server " diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerMain.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerMain.java index ba368762d154..7e3d62cd97af 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerMain.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerMain.java @@ -19,6 +19,7 @@ import io.confluent.ksql.logging.query.QueryLogger; import io.confluent.ksql.metrics.MetricCollectors; import io.confluent.ksql.properties.PropertiesUtil; +import io.confluent.ksql.rest.server.state.ServerState; import io.confluent.ksql.serde.FormatFactory; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; @@ -30,16 +31,21 @@ import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; import org.apache.kafka.streams.StreamsConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +// CHECKSTYLE_RULES.OFF: ClassDataAbstractionCoupling public class KsqlServerMain { + // CHECKSTYLE_RULES.ON: ClassDataAbstractionCoupling private static final Logger log = LoggerFactory.getLogger(KsqlServerMain.class); private final Executor shutdownHandler; - private final Executable executable; + private final Executable preconditionChecker; + private final Supplier executable; public static void main(final String[] args) { try { @@ -48,10 +54,12 @@ public static void main(final String[] args) { return; } - final Map properties = PropertiesUtil.applyOverrides( - PropertiesUtil.loadProperties(serverOptions.getPropertiesFile()), - System.getProperties() - ); + final Supplier> propertiesLoader = + () -> PropertiesUtil.applyOverrides( + PropertiesUtil.loadProperties(serverOptions.getPropertiesFile()), + System.getProperties() + ); + final Map properties = propertiesLoader.get(); final String installDir = properties.getOrDefault("ksql.server.install.dir", ""); final KsqlConfig ksqlConfig = new KsqlConfig(properties); @@ -60,10 +68,18 @@ public static void main(final String[] args) { final Optional queriesFile = serverOptions.getQueriesFile(properties); final MetricCollectors metricCollectors = new MetricCollectors(); - final Executable executable = createExecutable( - properties, queriesFile, installDir, ksqlConfig, metricCollectors); + final ServerState serverState = new ServerState(); + final Executable preconditionChecker = new PreconditionChecker(propertiesLoader, serverState); + final Supplier executableFactory = () -> createExecutable( + propertiesLoader, + serverState, + queriesFile, + installDir, + metricCollectors + ); new KsqlServerMain( - executable, + preconditionChecker, + executableFactory, r -> Runtime.getRuntime().addShutdownHook(new Thread(r)) ).tryStartApp(); } catch (final Exception e) { @@ -72,16 +88,30 @@ public static void main(final String[] args) { } } - KsqlServerMain(final Executable executable, final Executor shutdownHandler) { - this.executable = Objects.requireNonNull(executable, "executable"); + KsqlServerMain( + final Executable preconditionChecker, + final Supplier executableFactory, + final Executor shutdownHandler) { + this.preconditionChecker = Objects.requireNonNull(preconditionChecker, "preconditionChecker"); + this.executable = Objects.requireNonNull(executableFactory, "executableFactory"); this.shutdownHandler = Objects.requireNonNull(shutdownHandler, "shutdownHandler"); } void tryStartApp() throws Exception { + final boolean shutdown = runExecutable(preconditionChecker); + if (shutdown) { + return; + } + runExecutable(executable.get()); + } + + boolean runExecutable(final Executable executable) throws Exception { final CountDownLatch latch = new CountDownLatch(1); + final AtomicBoolean notified = new AtomicBoolean(false); shutdownHandler.execute(() -> { executable.notifyTerminated(); try { + notified.set(true); latch.await(); } catch (final InterruptedException e) { throw new RuntimeException(e); @@ -103,6 +133,7 @@ void tryStartApp() throws Exception { } finally { latch.countDown(); } + return notified.get(); } private static void validateConfig(final KsqlConfig config) { @@ -142,12 +173,15 @@ private static void validateTopicFormat( @SuppressWarnings("OptionalUsedAsFieldOrParameterType") private static Executable createExecutable( - final Map properties, + final Supplier> propertiesLoader, + final ServerState serverState, final Optional queriesFile, final String installDir, - final KsqlConfig ksqlConfig, final MetricCollectors metricCollectors - ) throws IOException { + ) { + final Map properties = propertiesLoader.get(); + final KsqlConfig ksqlConfig = new KsqlConfig(properties); + if (queriesFile.isPresent()) { return StandaloneExecutorFactory.create( properties, @@ -159,7 +193,7 @@ private static Executable createExecutable( final KsqlRestConfig restConfig = new KsqlRestConfig(properties); final Executable restApp = KsqlRestApplication - .buildApplication(restConfig, metricCollectors); + .buildApplication(restConfig, serverState, metricCollectors); final String connectConfigFile = ksqlConfig.getString(KsqlConfig.CONNECT_WORKER_CONFIG_FILE_PROPERTY); @@ -167,8 +201,12 @@ private static Executable createExecutable( return restApp; } - final Executable connect = ConnectExecutable.of(connectConfigFile); - return MultiExecutable.of(connect, restApp); + try { + final Executable connect = ConnectExecutable.of(connectConfigFile); + return MultiExecutable.of(connect, restApp); + } catch (final IOException e) { + throw new RuntimeException(e); + } } @VisibleForTesting diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerPrecondition.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerPrecondition.java index 4f84f3af0933..51d05c685bec 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerPrecondition.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerPrecondition.java @@ -18,19 +18,20 @@ import io.confluent.ksql.rest.entity.KsqlErrorMessage; import io.confluent.ksql.services.KafkaTopicClient; import io.confluent.ksql.services.ServiceContext; +import java.util.Map; import java.util.Optional; public interface KsqlServerPrecondition { /** * Check a precondition for initializing the KSQL server. * - * @param config The KSQL server config + * @param properties The current ksql server properties (raw) * @param serviceContext The KSQL server context for accessing external serivces * @return Optional.empty() if precondition check passes, non-empty KsqlErrorMessage object if the * check does not pass. */ Optional checkPrecondition( - KsqlRestConfig config, + Map properties, ServiceContext serviceContext, KafkaTopicClient internalTopicClient ); diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/PreconditionChecker.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/PreconditionChecker.java new file mode 100644 index 000000000000..8cb761412c52 --- /dev/null +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/PreconditionChecker.java @@ -0,0 +1,242 @@ +/* + * Copyright 2022 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.server; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.ksql.rest.client.KsqlClient; +import io.confluent.ksql.rest.entity.KsqlErrorMessage; +import io.confluent.ksql.rest.server.services.InternalKsqlClientFactory; +import io.confluent.ksql.rest.server.services.RestServiceContextFactory; +import io.confluent.ksql.rest.server.state.ServerState; +import io.confluent.ksql.schema.registry.KsqlSchemaRegistryClientFactory; +import io.confluent.ksql.services.ConnectClientFactory; +import io.confluent.ksql.services.DefaultConnectClientFactory; +import io.confluent.ksql.services.KafkaTopicClient; +import io.confluent.ksql.services.KafkaTopicClientImpl; +import io.confluent.ksql.services.LazyServiceContext; +import io.confluent.ksql.services.ServiceContext; +import io.confluent.ksql.util.KsqlConfig; +import io.confluent.ksql.util.RetryUtil; +import io.vertx.core.Vertx; +import io.vertx.core.VertxOptions; +import io.vertx.core.net.SocketAddress; +import java.net.URI; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; +import java.util.function.Supplier; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +// CHECKSTYLE_RULES.OFF: ClassDataAbstractionCoupling +public class PreconditionChecker implements Executable { + // CHECKSTYLE_RULES.ON: ClassDataAbstractionCoupling + + private static final Logger LOG = LoggerFactory.getLogger(PreconditionChecker.class); + + final ServiceContext serviceContext; + final KsqlRestConfig restConfig; + final Supplier> propertiesLoader; + final KafkaTopicClient topicClient; + final Vertx vertx; + final List preconditions; + final PreconditionServer server; + final ServerState serverState; + private final CompletableFuture terminatedFuture = new CompletableFuture<>(); + + public PreconditionChecker( + final Supplier> propertiesLoader, + final ServerState serverState + ) { + this.propertiesLoader = Objects.requireNonNull(propertiesLoader, "propertiesLoader"); + final Map properties = propertiesLoader.get(); + this.restConfig = new KsqlRestConfig(properties); + this.serviceContext = buildServiceContext(propertiesLoader); + this.serverState = Objects.requireNonNull(serverState, "serverState"); + this.topicClient = new KafkaTopicClientImpl( + () -> createCommandTopicAdminClient( + new KsqlRestConfig(propertiesLoader.get()), new KsqlConfig(propertiesLoader.get()))); + this.preconditions = restConfig.getConfiguredInstances( + KsqlRestConfig.KSQL_SERVER_PRECONDITIONS, + KsqlServerPrecondition.class + ); + this.vertx = Vertx.vertx( + new VertxOptions() + .setMaxWorkerExecuteTimeUnit(TimeUnit.MILLISECONDS) + .setMaxWorkerExecuteTime(Long.MAX_VALUE)); + this.server = new PreconditionServer( + vertx, + restConfig, + serverState + ); + } + + @VisibleForTesting + PreconditionChecker( + final Supplier> propertiesLoader, + final ServiceContext serviceContext, + final KsqlRestConfig restConfig, + final KafkaTopicClient topicClient, + final Vertx vertx, + final List preconditions, + final PreconditionServer server, + final ServerState state + ) { + this.propertiesLoader = Objects.requireNonNull(propertiesLoader, "propertiesLoader"); + this.serviceContext = Objects.requireNonNull(serviceContext, "serviceContext"); + this.restConfig = Objects.requireNonNull(restConfig, "restConfig"); + this.topicClient = Objects.requireNonNull(topicClient, "topicClient"); + this.vertx = Objects.requireNonNull(vertx, "vertx"); + this.preconditions = Objects.requireNonNull(preconditions, "preconditions"); + this.server = Objects.requireNonNull(server, "server"); + this.serverState = Objects.requireNonNull(state, "state"); + } + + private boolean shouldCheckPreconditions() { + return preconditions.stream() + .map(p -> p.checkPrecondition(propertiesLoader.get(), serviceContext, topicClient)) + .peek( + r -> r.ifPresent(rr -> LOG.info("Precondition failed: {}", rr)) + ) + .anyMatch(Optional::isPresent); + } + + /** + * Maybe start a precondition server. A precondition server is start if any of the configured + * preconditions fail. Otherwise, no server is started. The precondition server responds to all + * requests with 503, other than the liveness and readiness probes. + */ + @Override + public void startAsync() { + if (!shouldCheckPreconditions()) { + LOG.info("All preconditions passed, skipping precondition server start"); + return; + } + LOG.info("Some preconditions not passed, starting precondition server"); + server.start(); + } + + @Override + public void notifyTerminated() { + terminatedFuture.complete(null); + } + + @Override + public void shutdown() { + if (server.started()) { + server.stop(); + } + vertx.close(); + } + + /** + * Wait until either all preconditions evaluate successfully, or the process is asked + * to exit (by calling notifyTermianted) + */ + @Override + public void awaitTerminated() { + final List> predicates = ImmutableList.of( + e -> !(e instanceof KsqlFailedPrecondition) + ); + RetryUtil.retryWithBackoff( + Integer.MAX_VALUE, + 1000, + 30000, + this::checkPreconditions, + terminatedFuture::isDone, + predicates + ); + } + + public List getListeners() { + return server.getListeners(); + } + + /** + * Checks all preconditions. This is called first to decide whether or not the precondition + * server needs to run. Then, it's called indefinitely until all preconditions pass. + */ + private void checkPreconditions() { + LOG.info("Checking preconditions..."); + for (final KsqlServerPrecondition precondition : preconditions) { + final Optional error = precondition.checkPrecondition( + propertiesLoader.get(), + serviceContext, + topicClient + ); + if (error.isPresent()) { + LOG.info("Precondition failed: {}", error.get()); + serverState.setInitializingReason(error.get()); + throw new KsqlFailedPrecondition(error.get().toString()); + } + } + } + + @VisibleForTesting + static class KsqlFailedPrecondition extends RuntimeException { + KsqlFailedPrecondition(final String error) { + super(error); + } + } + + private static Admin createCommandTopicAdminClient( + final KsqlRestConfig ksqlRestConfig, + final KsqlConfig ksqlConfig + ) { + final Map adminClientConfigs = + new HashMap<>(ksqlConfig.getKsqlAdminClientConfigProps()); + adminClientConfigs.putAll(ksqlRestConfig.getCommandProducerProperties()); + return new DefaultKafkaClientSupplier().getAdmin(adminClientConfigs); + } + + private static ServiceContext buildServiceContext( + final Supplier> propertiesLoader + ) { + final Map properties = propertiesLoader.get(); + final Vertx vertx = Vertx.vertx( + new VertxOptions() + .setMaxWorkerExecuteTimeUnit(TimeUnit.MILLISECONDS) + .setMaxWorkerExecuteTime(Long.MAX_VALUE)); + final KsqlClient sharedClient = InternalKsqlClientFactory.createInternalClient( + properties, + SocketAddress::inetSocketAddress, + vertx + ); + final KsqlConfig ksqlConfig = new KsqlConfig(properties); + final Supplier schemaRegistryClientFactory = + new KsqlSchemaRegistryClientFactory(ksqlConfig, Collections.emptyMap())::get; + final ConnectClientFactory connectClientFactory = new DefaultConnectClientFactory(ksqlConfig); + return new LazyServiceContext(() -> RestServiceContextFactory.create( + ksqlConfig, + Optional.empty(), + schemaRegistryClientFactory, + connectClientFactory, + sharedClient, + Collections.emptyList(), + Optional.empty() + )); + } +} diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/PreconditionServer.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/PreconditionServer.java new file mode 100644 index 000000000000..9931b41cb01a --- /dev/null +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/PreconditionServer.java @@ -0,0 +1,174 @@ +/* + * Copyright 2022 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.server; + +import com.google.common.collect.ImmutableList; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import io.confluent.ksql.api.server.PreconditionVerticle; +import io.confluent.ksql.api.util.ApiServerUtils; +import io.confluent.ksql.rest.server.state.ServerState; +import io.confluent.ksql.util.FileWatcher; +import io.confluent.ksql.util.KsqlException; +import io.confluent.ksql.util.VertxCompletableFuture; +import io.vertx.core.Verticle; +import io.vertx.core.Vertx; +import io.vertx.core.http.ClientAuth; +import io.vertx.core.http.HttpServerOptions; +import java.net.URI; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PreconditionServer { + private static final Logger log + = LoggerFactory.getLogger(io.confluent.ksql.api.server.Server.class); + + private final Vertx vertx; + private final KsqlRestConfig config; + private final Set deploymentIds = new HashSet<>(); + private final ServerState serverState; + private final List listeners; + private FileWatcher fileWatcher; + + @SuppressFBWarnings(value = "EI_EXPOSE_REP2") + public PreconditionServer( + final Vertx vertx, + final KsqlRestConfig config, + final ServerState serverState + ) { + this.vertx = Objects.requireNonNull(vertx); + this.config = Objects.requireNonNull(config); + this.serverState = Objects.requireNonNull(serverState); + this.listeners = ImmutableList.copyOf(ApiServerUtils.parseListeners(config)); + } + + public synchronized void start() { + if (!deploymentIds.isEmpty()) { + throw new IllegalStateException("Already started"); + } + final int idleConnectionTimeoutSeconds = + config.getInt(KsqlRestConfig.IDLE_CONNECTION_TIMEOUT_SECONDS); + + fileWatcher = ApiServerUtils.configureTlsCertReload(config, this::restart); + + + final int instances = config.getInt(KsqlRestConfig.VERTICLE_INSTANCES); + log.debug("Deploying " + instances + " instances of server verticle"); + + final List> deployFutures = new ArrayList<>(); + for (URI listener : listeners) { + + for (int i = 0; i < instances; i++) { + final VertxCompletableFuture vcf = new VertxCompletableFuture<>(); + final Verticle serverVerticle = new PreconditionVerticle( + createHttpServerOptions(config, listener.getHost(), listener.getPort(), + listener.getScheme().equalsIgnoreCase("https"), + idleConnectionTimeoutSeconds), + serverState + ); + vertx.deployVerticle(serverVerticle, vcf); + deployFutures.add(vcf); + } + } + + final CompletableFuture allDeployFuture = CompletableFuture.allOf(deployFutures + .toArray(new CompletableFuture[0])); + + try { + allDeployFuture.get(); + for (CompletableFuture deployFuture : deployFutures) { + deploymentIds.add(deployFuture.get()); + } + } catch (Exception e) { + throw new KsqlException("Failed to start API server", e); + } + log.info("API server started"); + listeners.forEach( + l -> log.info("Listening on: " + l.toString()) + ); + } + + public synchronized boolean started() { + return !deploymentIds.isEmpty(); + } + + public synchronized void stop() { + if (deploymentIds.isEmpty()) { + throw new IllegalStateException("Not started"); + } + if (fileWatcher != null) { + fileWatcher.shutdown(); + } + final List> undeployFutures = new ArrayList<>(); + for (String deploymentID : deploymentIds) { + final VertxCompletableFuture future = new VertxCompletableFuture<>(); + vertx.undeploy(deploymentID, future); + undeployFutures.add(future); + } + try { + CompletableFuture.allOf(undeployFutures.toArray(new CompletableFuture[0])).get(); + } catch (Exception e) { + throw new KsqlException("Failure in stopping API server", e); + } + deploymentIds.clear(); + log.info("API server stopped"); + } + + public List getListeners() { + return ImmutableList.copyOf(listeners); + } + + private void restart() { + log.info("Restarting precondition server"); + stop(); + start(); + } + + private static HttpServerOptions createHttpServerOptions( + final KsqlRestConfig ksqlRestConfig, + final String host, + final int port, + final boolean tls, + final int idleTimeoutSeconds + ) { + + final HttpServerOptions options = new HttpServerOptions() + .setHost(host) + .setPort(port) + .setReuseAddress(true) + .setReusePort(true) + .setIdleTimeout(idleTimeoutSeconds).setIdleTimeoutUnit(TimeUnit.SECONDS) + .setPerMessageWebSocketCompressionSupported(true) + .setPerFrameWebSocketCompressionSupported(true); + + if (tls) { + final String ksConfigName = KsqlRestConfig.KSQL_SSL_KEYSTORE_ALIAS_EXTERNAL_CONFIG; + final ClientAuth clientAuth = ksqlRestConfig.getClientAuth(); + + final String alias = ksqlRestConfig.getString(ksConfigName); + ApiServerUtils.setTlsOptions(ksqlRestConfig, options, alias, clientAuth); + } + return options; + } +} + + diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/ServerOptions.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/ServerOptions.java index 17da32c32f85..76d23bca507d 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/ServerOptions.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/ServerOptions.java @@ -19,13 +19,14 @@ import com.github.rvesse.airline.annotations.Arguments; import com.github.rvesse.airline.annotations.Command; import com.github.rvesse.airline.annotations.Option; -import com.github.rvesse.airline.annotations.restrictions.Once; import com.github.rvesse.airline.annotations.restrictions.Required; import io.confluent.ksql.rest.util.OptionsParser; import java.io.File; import java.io.IOException; +import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.stream.Collectors; import javax.inject.Inject; @Command(name = "server", description = "KSQL Cluster") @@ -38,14 +39,13 @@ public class ServerOptions { public HelpOption help; @SuppressWarnings("unused") // Accessed via reflection - @Once @Required @Arguments( title = "config-file", description = "A file specifying configs for the KSQL Server, KSQL, " + "and its underlying Kafka Streams instance(s). Refer to KSQL " + "documentation for a list of available configs.") - private String propertiesFile; + private List propertiesFile; @SuppressWarnings("unused") // Accessed via reflection @Option( @@ -53,8 +53,8 @@ public class ServerOptions { description = "Path to the query file on the local machine.") private String queriesFile; - File getPropertiesFile() { - return new File(propertiesFile); + List getPropertiesFile() { + return propertiesFile.stream().map(File::new).collect(Collectors.toList()); } Optional getQueriesFile(final Map properties) { diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/PreconditionCheckerIntegrationTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/PreconditionCheckerIntegrationTest.java new file mode 100644 index 000000000000..04d593630a14 --- /dev/null +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/PreconditionCheckerIntegrationTest.java @@ -0,0 +1,146 @@ +/* + * Copyright 2022 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.integration; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +import com.google.common.collect.ImmutableMap; +import io.confluent.common.utils.IntegrationTest; +import io.confluent.ksql.rest.client.KsqlRestClient; +import io.confluent.ksql.rest.client.RestResponse; +import io.confluent.ksql.rest.entity.KsqlErrorMessage; +import io.confluent.ksql.rest.server.KsqlRestConfig; +import io.confluent.ksql.rest.server.PreconditionChecker; +import io.confluent.ksql.rest.server.state.ServerState; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.HttpMethod; +import io.vertx.core.http.HttpVersion; +import io.vertx.ext.web.client.HttpResponse; +import java.time.Duration; +import java.util.Map; +import java.util.Optional; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({IntegrationTest.class}) +public class PreconditionCheckerIntegrationTest { + private static final Map PROPERTIES = ImmutableMap.of( + KsqlRestConfig.KSQL_SERVER_PRECONDITIONS, + PreconditionCheckerIntegrationTestPrecondition.class.getCanonicalName() + ); + + private final ServerState serverState = new ServerState(); + + private PreconditionChecker checker; + + @Before + public void setup() { + PreconditionCheckerIntegrationTestPrecondition.ACTION.set(() -> Optional.of(new KsqlErrorMessage(123, "oops"))); + checker = new PreconditionChecker( + () -> PROPERTIES, + serverState + ); + checker.startAsync(); + } + + @After + public void cleanup() { + checker.shutdown(); + } + + @Test + public void shouldReturn503WhilePreconditionsDontPass() { + // Given: + final KsqlRestClient client = buildKsqlClient(); + + // When: + final RestResponse rsp = client.getServerInfo(); + + // Then: + assertThat(rsp.getStatusCode(), equalTo(503)); + } + + @Test + public void shouldExitAwaitOncePreconditionPasses() throws InterruptedException { + // Given: + final Thread waiter = new Thread(() -> checker.awaitTerminated()); + waiter.start(); + PreconditionCheckerIntegrationTestPrecondition.ACTION.set(Optional::empty); + + // When: + waiter.join(Duration.ofSeconds(30).toMillis()); + + // Then: + assertThat(waiter.isAlive(), is(false)); + } + + @Test + public void shouldExitWhenNotifiedToTerminate() throws InterruptedException { + // Given: + final Thread waiter = new Thread(() -> checker.awaitTerminated()); + waiter.start(); + checker.notifyTerminated(); + + // When: + waiter.join(Duration.ofSeconds(30).toMillis()); + + // Then: + assertThat(waiter.isAlive(), is(false)); + } + + @Test + public void shouldReturn200ForReadyEndpoint() { + shouldReturn200ForHealthcheckEndpoint("ready"); + } + + @Test + public void shouldReturn200ForLivenessEndpoint() { + shouldReturn200ForHealthcheckEndpoint("live"); + } + + private void shouldReturn200ForHealthcheckEndpoint(final String subPath) { + // When: + HttpResponse resp = RestIntegrationTestUtil.rawRestRequest( + checker.getListeners().get(0), + HttpVersion.HTTP_1_1, + HttpMethod.GET, + "/chc/" + subPath, + null, + "application/json", + Optional.empty(), + Optional.empty() + ); + + // Then: + assertThat(resp.statusCode(), equalTo(200)); + assertThat(resp.bodyAsString(), equalTo("{}")); + } + + public KsqlRestClient buildKsqlClient() { + return KsqlRestClient.create( + checker.getListeners().get(0).toString(), + ImmutableMap.of(), + ImmutableMap.of(), + Optional.empty(), + Optional.empty() + ); + } + +} diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/PreconditionCheckerIntegrationTestPrecondition.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/PreconditionCheckerIntegrationTestPrecondition.java new file mode 100644 index 000000000000..3bf09e7e7cf1 --- /dev/null +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/PreconditionCheckerIntegrationTestPrecondition.java @@ -0,0 +1,40 @@ +/* + * Copyright 2022 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.integration; + +import io.confluent.ksql.rest.entity.KsqlErrorMessage; +import io.confluent.ksql.rest.server.KsqlServerPrecondition; +import io.confluent.ksql.services.KafkaTopicClient; +import io.confluent.ksql.services.ServiceContext; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; + +public class PreconditionCheckerIntegrationTestPrecondition implements KsqlServerPrecondition { + + static AtomicReference>> ACTION + = new AtomicReference<>(Optional::empty); + + @Override + public Optional checkPrecondition( + Map properties, + ServiceContext serviceContext, + KafkaTopicClient internalTopicClient + ) { + return ACTION.get().get(); + } +} diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/PreconditionFunctionalTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/PreconditionFunctionalTest.java deleted file mode 100644 index b29e4ca3d9ef..000000000000 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/PreconditionFunctionalTest.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Copyright 2018 Confluent Inc. - * - * Licensed under the Confluent Community License (the "License"); you may not use - * this file except in compliance with the License. You may obtain a copy of the - * License at - * - * http://www.confluent.io/confluent-community-license - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package io.confluent.ksql.rest.integration; - -import static io.netty.handler.codec.http.HttpResponseStatus.OK; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.is; - -import io.confluent.common.utils.IntegrationTest; -import io.confluent.ksql.integration.IntegrationTestHarness; -import io.confluent.ksql.integration.Retry; -import io.confluent.ksql.rest.entity.KsqlErrorMessage; -import io.confluent.ksql.rest.server.KsqlRestConfig; -import io.confluent.ksql.rest.server.KsqlServerPrecondition; -import io.confluent.ksql.rest.server.TestKsqlRestAppWaitingOnPrecondition; -import io.confluent.ksql.services.KafkaTopicClient; -import io.confluent.ksql.services.ServiceContext; -import io.vertx.core.buffer.Buffer; -import io.vertx.core.http.HttpMethod; -import io.vertx.core.http.HttpVersion; -import io.vertx.core.json.JsonObject; -import io.vertx.ext.web.client.HttpResponse; -import java.util.Optional; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import kafka.zookeeper.ZooKeeperClientException; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.rules.RuleChain; - -@Category({IntegrationTest.class}) -public class PreconditionFunctionalTest { - - private static final String SERVER_PRECONDITIONS_CONFIG = "ksql.server.preconditions"; - - private static final int CUSTOM_ERROR_CODE = 50370; - private static final CountDownLatch latch = new CountDownLatch(1); - - private static final IntegrationTestHarness TEST_HARNESS = IntegrationTestHarness.build(); - - private static final TestKsqlRestAppWaitingOnPrecondition REST_APP = TestKsqlRestAppWaitingOnPrecondition - .builder(TEST_HARNESS::kafkaBootstrapServers) - .withStaticServiceContext(TEST_HARNESS::getServiceContext) - .withProperty( - SERVER_PRECONDITIONS_CONFIG, - "io.confluent.ksql.rest.integration.PreconditionFunctionalTest$TestFailedPrecondition") - .buildWaitingOnPrecondition(latch); - - @ClassRule - public static final RuleChain CHAIN = RuleChain - .outerRule(Retry.of(3, ZooKeeperClientException.class, 3, TimeUnit.SECONDS)) - .around(TEST_HARNESS) - .around(REST_APP); - - @BeforeClass - public static void setUpClass() { - REST_APP.startAndWaitForPrecondition(); - } - - @Test - public void shouldNotServeRequestsWhileWaitingForPrecondition() { - // When: - final KsqlErrorMessage error = - RestIntegrationTestUtil.makeKsqlRequestWithError(REST_APP, "SHOW STREAMS;"); - - // Then: - assertThat(error.getErrorCode(), is(CUSTOM_ERROR_CODE)); - } - - @Test - public void shouldServeClusterHealthCheckReadyRequestsWhileWaitingForPrecondition() { - shouldServeClusterHealthCheckRequestsWhileWaitingForPrecondition("ready"); - } - - @Test - public void shouldServeClusterHealthCheckLiveRequestsWhileWaitingForPrecondition() { - shouldServeClusterHealthCheckRequestsWhileWaitingForPrecondition("live"); - } - - private void shouldServeClusterHealthCheckRequestsWhileWaitingForPrecondition(String subPath) { - - // When: - HttpResponse resp = RestIntegrationTestUtil.rawRestRequest( - REST_APP, HttpVersion.HTTP_1_1, HttpMethod.GET, "/chc/" + subPath, null, Optional.empty()); - - // Then: - assertThat(resp.statusCode(), is(OK.code())); - JsonObject jsonObject = new JsonObject(resp.body()); - assertThat(jsonObject.isEmpty(), is(true)); - } - - public static class TestFailedPrecondition implements KsqlServerPrecondition { - - private boolean first = true; - - @Override - public Optional checkPrecondition( - final KsqlRestConfig restConfig, - final ServiceContext serviceContext, - final KafkaTopicClient topicClient) { - return fail(); - } - - private Optional fail() { - if (first) { - latch.countDown(); - first = false; - } - return Optional.of(new KsqlErrorMessage(CUSTOM_ERROR_CODE, "purposefully failed precondition")); - } - } -} \ No newline at end of file diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/RestIntegrationTestUtil.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/RestIntegrationTestUtil.java index 4c4d91cb0e10..84a38172411d 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/RestIntegrationTestUtil.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/RestIntegrationTestUtil.java @@ -298,13 +298,35 @@ static HttpResponse rawRestRequest( final String mediaType, final Optional> writeStream, final Optional credentials + ) { + return rawRestRequest( + restApp.getHttpListener(), + httpVersion, + method, + uri, + requestBody, + mediaType, + writeStream, + credentials + ); + } + + static HttpResponse rawRestRequest( + final URI listener, + final HttpVersion httpVersion, + final HttpMethod method, + final String uri, + final Object requestBody, + final String mediaType, + final Optional> writeStream, + final Optional credentials ) { Vertx vertx = Vertx.vertx(); WebClient webClient = null; try { WebClientOptions webClientOptions = new WebClientOptions() - .setDefaultHost(restApp.getHttpListener().getHost()) - .setDefaultPort(restApp.getHttpListener().getPort()) + .setDefaultHost(listener.getHost()) + .setDefaultPort(listener.getPort()) .setFollowRedirects(false); if (httpVersion == HttpVersion.HTTP_2) { @@ -313,10 +335,7 @@ static HttpResponse rawRestRequest( webClient = WebClient.create(vertx, webClientOptions); return rawRestRequest( - vertx, webClient, - restApp, - httpVersion, method, uri, requestBody, @@ -333,10 +352,7 @@ static HttpResponse rawRestRequest( } static HttpResponse rawRestRequest( - final Vertx vertx, final WebClient webClient, - final TestKsqlRestApp restApp, - final HttpVersion httpVersion, final HttpMethod method, final String uri, final Object requestBody, diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java index 0b603f86d2a8..a4e8b6cb3673 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java @@ -27,7 +27,6 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableList; @@ -37,12 +36,9 @@ import io.confluent.ksql.logging.processing.ProcessingLogContext; import io.confluent.ksql.logging.processing.ProcessingLogServerUtils; import io.confluent.ksql.metrics.MetricCollectors; -import io.confluent.ksql.parser.KsqlParser.ParsedStatement; -import io.confluent.ksql.parser.KsqlParser.PreparedStatement; import io.confluent.ksql.properties.DenyListPropertyValidator; import io.confluent.ksql.rest.EndpointResponse; import io.confluent.ksql.rest.entity.KsqlEntityList; -import io.confluent.ksql.rest.entity.KsqlErrorMessage; import io.confluent.ksql.rest.entity.KsqlRequest; import io.confluent.ksql.rest.entity.SourceInfo; import io.confluent.ksql.rest.entity.StreamsList; @@ -62,17 +58,14 @@ import io.confluent.ksql.version.metrics.VersionCheckerAgent; import io.vertx.core.Vertx; import java.util.Collections; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Queue; import java.util.function.Consumer; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.common.metrics.MetricsReporter; import org.apache.kafka.streams.StreamsConfig; -import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -118,10 +111,6 @@ public class KsqlRestApplicationTest { @Mock private KafkaTopicClient topicClient; @Mock - private KsqlServerPrecondition precondition1; - @Mock - private KsqlServerPrecondition precondition2; - @Mock private Consumer rocksDBConfigSetterHandler; @Mock private HeartbeatAgent heartbeatAgent; @@ -170,9 +159,6 @@ public void setUp() { when(ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG)).thenReturn("ksql-id"); when(ksqlConfig.getKsqlStreamConfigProps()).thenReturn(ImmutableMap.of("state.dir", "/tmp/cat")); - when(precondition1.checkPrecondition(any(), any(), any())).thenReturn(Optional.empty()); - when(precondition2.checkPrecondition(any(), any(), any())).thenReturn(Optional.empty()); - when(response.getStatus()).thenReturn(200); when(response.getEntity()).thenReturn(new KsqlEntityList( Collections.singletonList(new StreamsList( @@ -354,51 +340,6 @@ public void shouldSendCreateStreamRequestBeforeSettingReady() { inOrder.verify(serverState).setReady(); } - @Test - public void shouldCheckPreconditionsBeforeUsingServiceContext() { - // Given: - when(precondition2.checkPrecondition(any(), any(), any())).then(a -> { - verifyNoMoreInteractions(serviceContext); - return Optional.empty(); - }); - - // When: - app.startKsql(ksqlConfig); - - // Then: - final InOrder inOrder = Mockito.inOrder(precondition1, precondition2, serviceContext); - inOrder.verify(precondition1).checkPrecondition(restConfig, serviceContext, internalTopicClient); - inOrder.verify(precondition2).checkPrecondition(restConfig, serviceContext, internalTopicClient); - } - - @Test - public void shouldNotInitializeUntilPreconditionsChecked() { - // Given: - final KsqlErrorMessage error1 = new KsqlErrorMessage(50000, "error1"); - final KsqlErrorMessage error2 = new KsqlErrorMessage(50000, "error2"); - final Queue errors = new LinkedList<>(); - errors.add(error1); - errors.add(error2); - when(precondition2.checkPrecondition(any(), any(), any())).then(a -> { - verifyNoMoreInteractions(serviceContext); - return Optional.ofNullable(errors.isEmpty() ? null : errors.remove()); - }); - - // When: - app.startKsql(ksqlConfig); - - // Then: - final InOrder inOrder = Mockito.inOrder(precondition1, precondition2, serverState); - inOrder.verify(precondition1).checkPrecondition(restConfig, serviceContext, internalTopicClient); - inOrder.verify(precondition2).checkPrecondition(restConfig, serviceContext, internalTopicClient); - inOrder.verify(serverState).setInitializingReason(error1); - inOrder.verify(precondition1).checkPrecondition(restConfig, serviceContext, internalTopicClient); - inOrder.verify(precondition2).checkPrecondition(restConfig, serviceContext, internalTopicClient); - inOrder.verify(serverState).setInitializingReason(error2); - inOrder.verify(precondition1).checkPrecondition(restConfig, serviceContext, internalTopicClient); - inOrder.verify(precondition2).checkPrecondition(restConfig, serviceContext, internalTopicClient); - } - @Test public void shouldConfigureRocksDBConfigSetter() { // When: @@ -471,7 +412,6 @@ private void givenAppWithRestConfig( Optional.empty(), serverState, processingLogContext, - ImmutableList.of(precondition1, precondition2), ImmutableList.of(ksqlEngine, ksqlResource), rocksDBConfigSetterHandler, Optional.of(heartbeatAgent), diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlServerMainTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlServerMainTest.java index 4724bd45ebd7..9a31301a71fa 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlServerMainTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlServerMainTest.java @@ -50,13 +50,15 @@ public class KsqlServerMainTest { @Mock(MockType.NICE) private Executable executable; @Mock(MockType.NICE) + private Executable precondition; + @Mock(MockType.NICE) private Executor shutdownHandler; private final File mockStreamsStateDir = mock(File.class); @Before public void setUp() { - main = new KsqlServerMain(executable, shutdownHandler); + main = new KsqlServerMain(precondition, () -> executable, shutdownHandler); when(mockStreamsStateDir.exists()).thenReturn(true); when(mockStreamsStateDir.mkdirs()).thenReturn(true); when(mockStreamsStateDir.isDirectory()).thenReturn(true); @@ -106,9 +108,10 @@ public void shouldNotifyAppOnTerminate() throws Exception { // Given: final Capture captureShutdownHandler = newCapture(); shutdownHandler.execute(capture(captureShutdownHandler)); - executable.notifyTerminated(); expectLastCall(); - replay(shutdownHandler, executable); + precondition.notifyTerminated(); + expectLastCall(); + replay(shutdownHandler, precondition); main.tryStartApp(); final Runnable handler = captureShutdownHandler.getValue(); @@ -116,7 +119,7 @@ public void shouldNotifyAppOnTerminate() throws Exception { handler.run(); // Then: - verify(executable); + verify(precondition); } @Test diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/PreconditionCheckerTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/PreconditionCheckerTest.java new file mode 100644 index 000000000000..8b708b1290eb --- /dev/null +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/PreconditionCheckerTest.java @@ -0,0 +1,204 @@ +/* + * Copyright 2022 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.server; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.confluent.ksql.rest.entity.KsqlErrorMessage; +import io.confluent.ksql.rest.server.state.ServerState; +import io.confluent.ksql.services.KafkaTopicClient; +import io.confluent.ksql.services.ServiceContext; +import io.vertx.core.Vertx; +import java.util.Map; +import java.util.Optional; +import java.util.function.Supplier; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class PreconditionCheckerTest { + private static final KsqlErrorMessage ERROR = new KsqlErrorMessage(123, "oops"); + private static final Map PROPERTIES = ImmutableMap.of(); + + @Mock + private PreconditionServer server; + @Mock + private KsqlServerPrecondition precondition1; + @Mock + private KsqlServerPrecondition precondition2; + @Mock + private Vertx vertx; + @Mock + private ServiceContext serviceContext; + @Mock + private KafkaTopicClient topicClient; + @Mock + private ServerState serverState; + @Mock + private Supplier> propertiesLoader; + + private PreconditionChecker checker; + + private final KsqlRestConfig restConfig = new KsqlRestConfig(PROPERTIES); + + @Before + public void setup() { + checker = new PreconditionChecker( + propertiesLoader, + serviceContext, + restConfig, + topicClient, + vertx, + ImmutableList.of(precondition1, precondition2), + server, + serverState + ); + when(precondition1.checkPrecondition(any(), any(), any())).thenReturn(Optional.empty()); + when(precondition2.checkPrecondition(any(), any(), any())).thenReturn(Optional.empty()); + when(server.started()).thenReturn(true); + when(propertiesLoader.get()).thenReturn(PROPERTIES); + } + + @Test + public void shouldNotStartServerIfNoPreconditions() { + // Given: + PreconditionChecker checker = new PreconditionChecker( + propertiesLoader, + serviceContext, + restConfig, + topicClient, + vertx, + ImmutableList.of(), + server, + serverState + ); + + // When: + checker.startAsync(); + + // Then: + verifyNoInteractions(server); + } + + @Test + public void shouldNotStartServerIfPreconditionsPass() { + // When: + checker.startAsync(); + + // Then: + verifyNoInteractions(server); + } + + @Test + public void shouldStartServerIfPreconditionsDoNotPass() { + // Given: + givenPreconditionFailures(precondition1); + + // When: + checker.startAsync(); + + // Then: + verify(server).start(); + } + + @Test + public void shouldUpdateServerStateOnFailure() { + // Given: + givenPreconditionFailures(precondition1); + + // When: + checker.startAsync(); + checker.awaitTerminated(); + + // Then: + verify(serverState).setInitializingReason(ERROR); + } + + @Test + public void shouldRecheckPreconditions() { + // Given: + givenPreconditionFailures(precondition1); + + // When: + checker.startAsync(); + checker.awaitTerminated(); + + // Then: + verify(precondition1, times(3)) + .checkPrecondition(PROPERTIES, serviceContext, topicClient); + } + + @Test + public void shouldPassFreshPropertiesToPreconditions() { + // Given: + givenPreconditionFailures(precondition1); + final Map properties1 = ImmutableMap.of("a", "b"); + final Map properties2 = ImmutableMap.of("c", "d"); + final Map properties3 = ImmutableMap.of("e", "f"); + when(propertiesLoader.get()) + .thenReturn(properties1) + .thenReturn(properties2) + .thenReturn(properties3); + + // When: + checker.startAsync(); + checker.awaitTerminated(); + + // Then: + verify(precondition1).checkPrecondition(properties1, serviceContext, topicClient); + verify(precondition1).checkPrecondition(properties2, serviceContext, topicClient); + verify(precondition1).checkPrecondition(properties3, serviceContext, topicClient); + } + + @Test + public void shouldCloseInners() { + // When: + checker.shutdown(); + + // Then: + verify(server).stop(); + verify(vertx).close(); + } + + @Test + public void shouldNotStopServerIfNotStarted() { + // Given: + when(server.started()).thenReturn(false); + + // When: + checker.shutdown(); + + // Then: + verify(server, never()).stop(); + } + + void givenPreconditionFailures(final KsqlServerPrecondition precondition) { + when(precondition.checkPrecondition(any(), any(), any())) + .thenReturn(Optional.of(ERROR)) + .thenReturn(Optional.of(ERROR)) + .thenReturn(Optional.empty()); + } +} \ No newline at end of file diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/TestKsqlRestApp.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/TestKsqlRestApp.java index cd84060edd6a..78897249b0d7 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/TestKsqlRestApp.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/TestKsqlRestApp.java @@ -41,6 +41,7 @@ import io.confluent.ksql.rest.server.services.TestDefaultKsqlClientFactory; import io.confluent.ksql.rest.server.services.TestRestServiceContextFactory; import io.confluent.ksql.rest.server.services.TestRestServiceContextFactory.InternalSimpleKsqlClientFactory; +import io.confluent.ksql.rest.server.state.ServerState; import io.confluent.ksql.services.DisabledKsqlClient; import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.services.ServiceContextFactory; @@ -337,6 +338,7 @@ protected void initialize() { ksqlRestApplication = KsqlRestApplication.buildApplication( metricsPrefix, ksqlRestConfig, + new ServerState(), (booleanSupplier) -> mock(VersionCheckerAgent.class), 3, serviceContext.get(), @@ -691,16 +693,5 @@ public TestKsqlRestApp build() { internalSimpleKsqlClientFactory ); } - - public TestKsqlRestAppWaitingOnPrecondition buildWaitingOnPrecondition(final CountDownLatch latch) { - return new TestKsqlRestAppWaitingOnPrecondition( - bootstrapServers, - additionalProps, - serviceContext, - credentials, - latch, - internalSimpleKsqlClientFactory - ); - } } } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/TestKsqlRestAppWaitingOnPrecondition.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/TestKsqlRestAppWaitingOnPrecondition.java deleted file mode 100644 index ab4b7ff1ca63..000000000000 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/TestKsqlRestAppWaitingOnPrecondition.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Copyright 2020 Confluent Inc. - * - * Licensed under the Confluent Community License (the "License"); you may not use - * this file except in compliance with the License. You may obtain a copy of the - * License at - * - * http://www.confluent.io/confluent-community-license - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package io.confluent.ksql.rest.server; - -import io.confluent.ksql.rest.client.BasicCredentials; -import io.confluent.ksql.rest.server.services.TestRestServiceContextFactory.InternalSimpleKsqlClientFactory; -import io.confluent.ksql.services.ServiceContext; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.CountDownLatch; -import java.util.function.Supplier; - -/** - * A {@link TestKsqlRestApp} for testing behavior of a server stuck waiting on a precondition. - * - * The server is not started automatically. Rather, {@code startAndWaitForPrecondition()} should - * be called by the test suite. A {@code CountDownLatch} provided in the constructor counts down - * when a precondition check is called, in order to finish configuring the app once the server - * is waiting for preconditions. - */ -public class TestKsqlRestAppWaitingOnPrecondition extends TestKsqlRestApp { - - private CountDownLatch latch; - - TestKsqlRestAppWaitingOnPrecondition( - final Supplier bootstrapServers, - final Map additionalProps, - final Supplier serviceContext, - final Optional credentials, - final CountDownLatch latch, - final InternalSimpleKsqlClientFactory internalSimpleKsqlClientFactory - ) { - super(bootstrapServers, additionalProps, serviceContext, credentials, - internalSimpleKsqlClientFactory); - this.latch = latch; - } - - @Override - protected void before() { - initialize(); - } - - public void startAndWaitForPrecondition() { - try { - new Thread(() -> { - try { - ksqlRestApplication.startAsync(); - } catch (Exception e) { - throw new RuntimeException("Error starting server", e); - } - }).start(); - latch.await(); - } catch (final Exception var2) { - throw new RuntimeException("Failed to start Ksql rest server", var2); - } - - listeners.addAll(ksqlRestApplication.getListeners()); - ksqlEngine = ksqlRestApplication.getEngine(); - } -}