From c564e63ac4b85357ff1d084d6e517900112e65c6 Mon Sep 17 00:00:00 2001 From: Alan Sheinberg Date: Mon, 13 Dec 2021 13:54:45 -0800 Subject: [PATCH 1/3] fix: Prevents internal http2 requests from having shared connections closed --- .../ksql/rest/server/KsqlRestConfig.java | 12 +++++++ .../services/InternalKsqlClientFactory.java | 35 ++++++++++++++++--- 2 files changed, 42 insertions(+), 5 deletions(-) diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java index fc3cf301cbc9..4df18f37737a 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java @@ -381,6 +381,12 @@ public class KsqlRestConfig extends AbstractConfig { + " otherwise be verbose. Note that this works on the entire URI, respecting the " + KSQL_ENDPOINT_LOGGING_LOG_QUERIES_CONFIG + " configuration"; + public static final String KSQL_INTERNAL_HTTP2_MAX_POOL_SIZE_CONFIG + = "ksql.internal.http2.max.pool.size"; + public static final int KSQL_INTERNAL_HTTP2_MAX_POOL_SIZE_DEFAULT = 1000; + public static final String KSQL_INTERNAL_HTTP2_MAX_POOL_SIZE_DOC = + "The maximum connection pool size used by Vertx for http2 internal connections"; + private static final ConfigDef CONFIG_DEF; static { @@ -722,6 +728,12 @@ public class KsqlRestConfig extends AbstractConfig { KSQL_ENDPOINT_LOGGING_LOG_QUERIES_DEFAULT, Importance.LOW, KSQL_ENDPOINT_LOGGING_LOG_QUERIES_DOC + ).define( + KSQL_INTERNAL_HTTP2_MAX_POOL_SIZE_CONFIG, + Type.INT, + KSQL_INTERNAL_HTTP2_MAX_POOL_SIZE_DEFAULT, + Importance.LOW, + KSQL_INTERNAL_HTTP2_MAX_POOL_SIZE_DOC ); } diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/services/InternalKsqlClientFactory.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/services/InternalKsqlClientFactory.java index 36a41a1fb8eb..5d412ebaca11 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/services/InternalKsqlClientFactory.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/services/InternalKsqlClientFactory.java @@ -30,8 +30,11 @@ import java.util.Optional; import java.util.function.BiFunction; import java.util.function.Function; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public final class InternalKsqlClientFactory { + private static final Logger LOG = LoggerFactory.getLogger(InternalKsqlClientFactory.class); private InternalKsqlClientFactory() { @@ -59,9 +62,9 @@ public static KsqlClient createInternalClient( private static Function httpOptionsFactory( final Map clientProps, final boolean verifyHost, - final Function clientOptions) { + final BiFunction, Boolean, HttpClientOptions> clientOptions) { return (tls) -> { - final HttpClientOptions httpClientOptions = clientOptions.apply(tls); + final HttpClientOptions httpClientOptions = clientOptions.apply(clientProps, tls); if (!tls) { return httpClientOptions; } @@ -88,12 +91,34 @@ private static Function httpOptionsFactory( }; } - private static HttpClientOptions createClientOptions(final boolean tls) { + private static HttpClientOptions createClientOptions( + final Map clientProps, + final boolean tls + ) { return new HttpClientOptions().setMaxPoolSize(100); } - private static HttpClientOptions createClientOptionsHttp2(final boolean tls) { - return new HttpClientOptions().setHttp2MaxPoolSize(100).setProtocolVersion(HttpVersion.HTTP_2) + private static HttpClientOptions createClientOptionsHttp2( + final Map clientProps, + final boolean tls + ) { + final String size = clientProps.get( + KsqlRestConfig.KSQL_INTERNAL_HTTP2_MAX_POOL_SIZE_CONFIG); + int sizeInt; + try { + sizeInt = Integer.parseInt(size); + } catch (NumberFormatException e) { + LOG.error("Bad int passed in for " + KsqlRestConfig.KSQL_INTERNAL_HTTP2_MAX_POOL_SIZE_CONFIG + + ", using 1000", e); + sizeInt = 1000; + } + return new HttpClientOptions() + // At the moment, we cannot asynchronously end long-running queries in http2, in a way that + // we can with http1.1, by just closing the connection. For that reason, we've disabled + // multiplexing: https://github.com/confluentinc/ksql/issues/8505 + .setHttp2MultiplexingLimit(1) + .setHttp2MaxPoolSize(sizeInt) + .setProtocolVersion(HttpVersion.HTTP_2) .setUseAlpn(tls); } } From 8824a1baf35eff8ae151135b688aaa98dc106cb0 Mon Sep 17 00:00:00 2001 From: Alan Sheinberg Date: Mon, 13 Dec 2021 15:58:24 -0800 Subject: [PATCH 2/3] Feedback --- .../services/InternalKsqlClientFactory.java | 16 ++-- .../InternalKsqlClientFactoryTest.java | 86 +++++++++++++++++++ 2 files changed, 96 insertions(+), 6 deletions(-) create mode 100644 ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/services/InternalKsqlClientFactoryTest.java diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/services/InternalKsqlClientFactory.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/services/InternalKsqlClientFactory.java index 5d412ebaca11..060e4dc37e9c 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/services/InternalKsqlClientFactory.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/services/InternalKsqlClientFactory.java @@ -105,12 +105,16 @@ private static HttpClientOptions createClientOptionsHttp2( final String size = clientProps.get( KsqlRestConfig.KSQL_INTERNAL_HTTP2_MAX_POOL_SIZE_CONFIG); int sizeInt; - try { - sizeInt = Integer.parseInt(size); - } catch (NumberFormatException e) { - LOG.error("Bad int passed in for " + KsqlRestConfig.KSQL_INTERNAL_HTTP2_MAX_POOL_SIZE_CONFIG - + ", using 1000", e); - sizeInt = 1000; + if (size != null) { + try { + sizeInt = Integer.parseInt(size); + } catch (NumberFormatException e) { + LOG.error("Bad int passed in for " + KsqlRestConfig.KSQL_INTERNAL_HTTP2_MAX_POOL_SIZE_CONFIG + + ", using " + KsqlRestConfig.KSQL_INTERNAL_HTTP2_MAX_POOL_SIZE_DEFAULT, e); + sizeInt = KsqlRestConfig.KSQL_INTERNAL_HTTP2_MAX_POOL_SIZE_DEFAULT; + } + } else { + sizeInt = KsqlRestConfig.KSQL_INTERNAL_HTTP2_MAX_POOL_SIZE_DEFAULT; } return new HttpClientOptions() // At the moment, we cannot asynchronously end long-running queries in http2, in a way that diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/services/InternalKsqlClientFactoryTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/services/InternalKsqlClientFactoryTest.java new file mode 100644 index 000000000000..126c4fcaa1b6 --- /dev/null +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/services/InternalKsqlClientFactoryTest.java @@ -0,0 +1,86 @@ +package io.confluent.ksql.rest.server.services; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableMap; +import io.confluent.ksql.rest.client.KsqlClient; +import io.confluent.ksql.rest.server.KsqlRestConfig; +import io.vertx.core.Vertx; +import io.vertx.core.http.HttpClient; +import io.vertx.core.http.HttpClientOptions; +import io.vertx.core.net.SocketAddress; +import java.util.List; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class InternalKsqlClientFactoryTest { + + @Mock + private Vertx vertx; + @Mock + private HttpClient httpClient; + @Captor + private ArgumentCaptor options; + + @Before + public void setUp() { + when(vertx.createHttpClient(options.capture())).thenReturn(httpClient); + } + + @Test + public void shouldCreateClient() { + // When: + KsqlClient client = InternalKsqlClientFactory.createInternalClient( + ImmutableMap.of(), SocketAddress::inetSocketAddress, + vertx); + + // Then: + assertThat(client, notNullValue()); + } + + @Test + public void shouldCreateClient_http2() { + // When: + KsqlClient client = InternalKsqlClientFactory.createInternalClient( + ImmutableMap.of( + KsqlRestConfig.KSQL_INTERNAL_HTTP2_MAX_POOL_SIZE_CONFIG, "1234" + ), SocketAddress::inetSocketAddress, + vertx); + List optionsList = options.getAllValues(); + + + // Then: + assertThat(client, notNullValue()); + assertThat(optionsList.size(), is(4)); + HttpClientOptions sslOptions = optionsList.get(2); + assertThat(sslOptions.getHttp2MaxPoolSize(), is(1234)); + } + + @Test + public void shouldCreateClient_http2_badPoolSize() { + // When: + KsqlClient client = InternalKsqlClientFactory.createInternalClient( + ImmutableMap.of( + KsqlRestConfig.KSQL_INTERNAL_HTTP2_MAX_POOL_SIZE_CONFIG, "abc" + ), SocketAddress::inetSocketAddress, + vertx); + List optionsList = options.getAllValues(); + + + // Then: + assertThat(client, notNullValue()); + assertThat(optionsList.size(), is(4)); + HttpClientOptions sslOptions = optionsList.get(2); + assertThat(sslOptions.getHttp2MaxPoolSize(), + is(KsqlRestConfig.KSQL_INTERNAL_HTTP2_MAX_POOL_SIZE_DEFAULT)); + } +} From c50d583eb61e08fc533d324a7d0bd673ac65a86d Mon Sep 17 00:00:00 2001 From: Alan Sheinberg Date: Mon, 13 Dec 2021 16:13:03 -0800 Subject: [PATCH 3/3] Feedback 2 --- .../main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java index 4df18f37737a..3f0fd4dd6f72 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java @@ -383,7 +383,7 @@ public class KsqlRestConfig extends AbstractConfig { public static final String KSQL_INTERNAL_HTTP2_MAX_POOL_SIZE_CONFIG = "ksql.internal.http2.max.pool.size"; - public static final int KSQL_INTERNAL_HTTP2_MAX_POOL_SIZE_DEFAULT = 1000; + public static final int KSQL_INTERNAL_HTTP2_MAX_POOL_SIZE_DEFAULT = 3000; public static final String KSQL_INTERNAL_HTTP2_MAX_POOL_SIZE_DOC = "The maximum connection pool size used by Vertx for http2 internal connections";