Skip to content

Commit

Permalink
fix: add back configs for setting TLS protocols and cipher suites (#6558
Browse files Browse the repository at this point in the history
)
  • Loading branch information
vcrfxia authored Nov 4, 2020
1 parent a2e893c commit cf7de69
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,33 @@ private static void setTlsOptions(
) {
options.setUseAlpn(true).setSsl(true);

configureTlsKeyStore(ksqlRestConfig, options, keyStoreAlias);
configureTlsTrustStore(ksqlRestConfig, options);

final List<String> enabledProtocols =
ksqlRestConfig.getList(KsqlRestConfig.SSL_ENABLED_PROTOCOLS_CONFIG);
if (!enabledProtocols.isEmpty()) {
options.setEnabledSecureTransportProtocols(new HashSet<>(enabledProtocols));
}

final List<String> cipherSuites =
ksqlRestConfig.getList(KsqlRestConfig.SSL_CIPHER_SUITES_CONFIG);
if (!cipherSuites.isEmpty()) {
// Vert.x does not yet support a method for setting cipher suites, so we use the following
// workaround instead. See https://github.com/eclipse-vertx/vert.x/issues/1507.
final Set<String> enabledCipherSuites = options.getEnabledCipherSuites();
enabledCipherSuites.clear();
enabledCipherSuites.addAll(cipherSuites);
}

options.setClientAuth(clientAuth);
}

private static void configureTlsKeyStore(
final KsqlRestConfig ksqlRestConfig,
final HttpServerOptions options,
final String keyStoreAlias
) {
final String keyStorePath = ksqlRestConfig
.getString(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG);
final Password keyStorePassword = ksqlRestConfig
Expand All @@ -341,17 +368,27 @@ private static void setTlsOptions(
new PfxOptions().setPath(keyStorePath).setPassword(keyStorePassword.value()));
}
}
}

private static void configureTlsTrustStore(
final KsqlRestConfig ksqlRestConfig,
final HttpServerOptions options
) {
final String trustStorePath = ksqlRestConfig
.getString(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG);
final Password trustStorePassword = ksqlRestConfig
.getPassword(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG);
if (trustStorePath != null && !trustStorePath.isEmpty()) {
options.setTrustStoreOptions(
new JksOptions().setPath(trustStorePath).setPassword(trustStorePassword.value()));
final String trustStoreType =
ksqlRestConfig.getString(KsqlRestConfig.SSL_TRUSTSTORE_TYPE_CONFIG);
if (trustStoreType.equals(KsqlRestConfig.SSL_STORE_TYPE_JKS)) {
options.setTrustStoreOptions(
new JksOptions().setPath(trustStorePath).setPassword(trustStorePassword.value()));
} else if (trustStoreType.equals(KsqlRestConfig.SSL_STORE_TYPE_PKCS12)) {
options.setPfxTrustOptions(
new PfxOptions().setPath(trustStorePath).setPassword(trustStorePassword.value()));
}
}

options.setClientAuth(clientAuth);
}

private static List<URI> parseListeners(final KsqlRestConfig config) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,17 @@ public class KsqlRestConfig extends AbstractConfig {
SSL_CLIENT_AUTHENTICATION_REQUIRED
);

public static final String SSL_ENABLED_PROTOCOLS_CONFIG = "ssl.enabled.protocols";
protected static final String SSL_ENABLED_PROTOCOLS_DOC =
"The list of protocols enabled for SSL connections. Comma-separated list. "
+ "If blank, the default from the Apache Kafka SslConfigs.java file will be used "
+ "(see 'DEFAULT_SSL_ENABLED_PROTOCOLS' in "
+ "https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java).";

public static final String SSL_CIPHER_SUITES_CONFIG = "ssl.cipher.suites";
protected static final String SSL_CIPHER_SUITES_DOC =
"A list of SSL cipher suites. If blank, the JVM default will be used.";

public static final String SSL_KEYSTORE_RELOAD_CONFIG = "ssl.keystore.reload";
protected static final String SSL_KEYSTORE_RELOAD_DOC =
"Enable auto reload of ssl keystore.";
Expand Down Expand Up @@ -448,15 +459,25 @@ public class KsqlRestConfig extends AbstractConfig {
"",
Importance.MEDIUM,
KSQL_SSL_KEYSTORE_ALIAS_EXTERNAL_DOC
)
.define(
).define(
KSQL_SSL_KEYSTORE_ALIAS_INTERNAL_CONFIG,
Type.STRING,
"",
Importance.MEDIUM,
KSQL_SSL_KEYSTORE_ALIAS_INTERNAL_DOC
)
.define(
).define(
SSL_ENABLED_PROTOCOLS_CONFIG,
Type.LIST,
SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS,
Importance.MEDIUM,
SSL_ENABLED_PROTOCOLS_DOC
).define(
SSL_CIPHER_SUITES_CONFIG,
Type.LIST,
"",
Importance.LOW,
SSL_CIPHER_SUITES_DOC
).define(
ADVERTISED_LISTENER_CONFIG,
Type.STRING,
null,
Expand Down
29 changes: 29 additions & 0 deletions ksqldb-rest-app/src/test/java/io/confluent/ksql/api/TlsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,16 @@

import io.confluent.ksql.rest.server.KsqlRestConfig;
import io.confluent.ksql.test.util.secure.ServerKeyStore;
import io.confluent.ksql.util.VertxCompletableFuture;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpVersion;
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.JksOptions;
import io.vertx.ext.web.client.HttpResponse;
import io.vertx.ext.web.client.WebClient;
import io.vertx.ext.web.client.WebClientOptions;
import java.nio.file.attribute.FileTime;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
Expand All @@ -54,6 +57,7 @@ protected KsqlRestConfig createServerConfig() {

Map<String, Object> config = new HashMap<>();
config.put(KsqlRestConfig.LISTENERS_CONFIG, "https://localhost:0");
config.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, "TLSv1.2");
config.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keyStorePath);
config.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, keyStorePassword);
config.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, trustStorePath);
Expand Down Expand Up @@ -85,6 +89,31 @@ protected WebClientOptions createClientOptions() {
setDefaultPort(server.getListeners().get(0).getPort());
}

@Test
public void shouldFailToUseDisabledTlsVersion() {
// Given
WebClientOptions clientOptions = createClientOptions()
.setEnabledSecureTransportProtocols(Collections.singleton("TLSv1.1"));
WebClient client = WebClient.create(vertx, clientOptions);

// When
JsonObject requestBody = new JsonObject().put("ksql", "show streams;");
VertxCompletableFuture<HttpResponse<Buffer>> requestFuture = new VertxCompletableFuture<>();
client
.post("/ksql")
.sendBuffer(requestBody.toBuffer(), requestFuture);

// Then
try {
requestFuture.get();
} catch (Exception e) {
assertThat(e,
instanceOf(ExecutionException.class)); // thrown from CompletableFuture.get()
assertThat(e.getMessage(), containsString(
"javax.net.ssl.SSLHandshakeException: Failed to create SSL connection"));
}
}

@Test
public void shouldReloadCert() throws Exception {
JsonObject requestBody = new JsonObject().put("sql", DEFAULT_PULL_QUERY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package io.confluent.ksql.rest.client;

import com.google.common.base.Strings;
import io.confluent.ksql.parser.json.KsqlTypesDeserializationModule;
import io.confluent.ksql.properties.LocalProperties;
import io.confluent.ksql.rest.ApiJsonMapper;
Expand Down Expand Up @@ -137,8 +138,10 @@ private static HttpClient createHttpClient(final Vertx vertx,
final HttpClientOptions httpClientOptions,
final boolean tls) {
if (tls) {
httpClientOptions.setVerifyHost(false);
httpClientOptions.setSsl(true);

configureHostVerification(clientProps, httpClientOptions);

final String trustStoreLocation = clientProps.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG);
if (trustStoreLocation != null) {
final String suppliedTruststorePassword = clientProps
Expand All @@ -147,10 +150,10 @@ private static HttpClient createHttpClient(final Vertx vertx,
.setPassword(suppliedTruststorePassword == null ? "" : suppliedTruststorePassword));
final String keyStoreLocation = clientProps.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG);
if (keyStoreLocation != null) {
final String suppliedKeyStorePassord = clientProps
final String suppliedKeyStorePassword = clientProps
.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG);
httpClientOptions.setKeyStoreOptions(new JksOptions().setPath(keyStoreLocation)
.setPassword(suppliedKeyStorePassord == null ? "" : suppliedKeyStorePassord));
.setPassword(suppliedKeyStorePassword == null ? "" : suppliedKeyStorePassword));
}
}
}
Expand All @@ -170,4 +173,24 @@ private static HttpClient createHttpClient(final Vertx vertx,
throw new KsqlRestClientException(e.getMessage(), e);
}
}

private static void configureHostVerification(
final Map<String, String> clientProps,
final HttpClientOptions httpClientOptions
) {
final String endpointIdentificationAlg =
clientProps.get(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG);
if (!Strings.isNullOrEmpty(endpointIdentificationAlg)) {
if (!endpointIdentificationAlg.toLowerCase().equals("https")) {
throw new IllegalArgumentException("Config '"
+ SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG
+ "' must be either 'https' or empty. Got: " + endpointIdentificationAlg);
}

httpClientOptions.setVerifyHost(true);
return;
}

httpClientOptions.setVerifyHost(false);
}
}

0 comments on commit cf7de69

Please sign in to comment.