diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java index fc3cf301cbc9..3f0fd4dd6f72 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java @@ -381,6 +381,12 @@ public class KsqlRestConfig extends AbstractConfig { + " otherwise be verbose. Note that this works on the entire URI, respecting the " + KSQL_ENDPOINT_LOGGING_LOG_QUERIES_CONFIG + " configuration"; + public static final String KSQL_INTERNAL_HTTP2_MAX_POOL_SIZE_CONFIG + = "ksql.internal.http2.max.pool.size"; + public static final int KSQL_INTERNAL_HTTP2_MAX_POOL_SIZE_DEFAULT = 3000; + public static final String KSQL_INTERNAL_HTTP2_MAX_POOL_SIZE_DOC = + "The maximum connection pool size used by Vertx for http2 internal connections"; + private static final ConfigDef CONFIG_DEF; static { @@ -722,6 +728,12 @@ public class KsqlRestConfig extends AbstractConfig { KSQL_ENDPOINT_LOGGING_LOG_QUERIES_DEFAULT, Importance.LOW, KSQL_ENDPOINT_LOGGING_LOG_QUERIES_DOC + ).define( + KSQL_INTERNAL_HTTP2_MAX_POOL_SIZE_CONFIG, + Type.INT, + KSQL_INTERNAL_HTTP2_MAX_POOL_SIZE_DEFAULT, + Importance.LOW, + KSQL_INTERNAL_HTTP2_MAX_POOL_SIZE_DOC ); } diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/services/InternalKsqlClientFactory.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/services/InternalKsqlClientFactory.java index 36a41a1fb8eb..060e4dc37e9c 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/services/InternalKsqlClientFactory.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/services/InternalKsqlClientFactory.java @@ -30,8 +30,11 @@ import java.util.Optional; import java.util.function.BiFunction; import java.util.function.Function; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public final class InternalKsqlClientFactory { + private static final Logger LOG = LoggerFactory.getLogger(InternalKsqlClientFactory.class); private InternalKsqlClientFactory() { @@ -59,9 +62,9 @@ public static KsqlClient createInternalClient( private static Function httpOptionsFactory( final Map clientProps, final boolean verifyHost, - final Function clientOptions) { + final BiFunction, Boolean, HttpClientOptions> clientOptions) { return (tls) -> { - final HttpClientOptions httpClientOptions = clientOptions.apply(tls); + final HttpClientOptions httpClientOptions = clientOptions.apply(clientProps, tls); if (!tls) { return httpClientOptions; } @@ -88,12 +91,38 @@ private static Function httpOptionsFactory( }; } - private static HttpClientOptions createClientOptions(final boolean tls) { + private static HttpClientOptions createClientOptions( + final Map clientProps, + final boolean tls + ) { return new HttpClientOptions().setMaxPoolSize(100); } - private static HttpClientOptions createClientOptionsHttp2(final boolean tls) { - return new HttpClientOptions().setHttp2MaxPoolSize(100).setProtocolVersion(HttpVersion.HTTP_2) + private static HttpClientOptions createClientOptionsHttp2( + final Map clientProps, + final boolean tls + ) { + final String size = clientProps.get( + KsqlRestConfig.KSQL_INTERNAL_HTTP2_MAX_POOL_SIZE_CONFIG); + int sizeInt; + if (size != null) { + try { + sizeInt = Integer.parseInt(size); + } catch (NumberFormatException e) { + LOG.error("Bad int passed in for " + KsqlRestConfig.KSQL_INTERNAL_HTTP2_MAX_POOL_SIZE_CONFIG + + ", using " + KsqlRestConfig.KSQL_INTERNAL_HTTP2_MAX_POOL_SIZE_DEFAULT, e); + sizeInt = KsqlRestConfig.KSQL_INTERNAL_HTTP2_MAX_POOL_SIZE_DEFAULT; + } + } else { + sizeInt = KsqlRestConfig.KSQL_INTERNAL_HTTP2_MAX_POOL_SIZE_DEFAULT; + } + return new HttpClientOptions() + // At the moment, we cannot asynchronously end long-running queries in http2, in a way that + // we can with http1.1, by just closing the connection. For that reason, we've disabled + // multiplexing: https://github.com/confluentinc/ksql/issues/8505 + .setHttp2MultiplexingLimit(1) + .setHttp2MaxPoolSize(sizeInt) + .setProtocolVersion(HttpVersion.HTTP_2) .setUseAlpn(tls); } } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/services/InternalKsqlClientFactoryTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/services/InternalKsqlClientFactoryTest.java new file mode 100644 index 000000000000..126c4fcaa1b6 --- /dev/null +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/services/InternalKsqlClientFactoryTest.java @@ -0,0 +1,86 @@ +package io.confluent.ksql.rest.server.services; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableMap; +import io.confluent.ksql.rest.client.KsqlClient; +import io.confluent.ksql.rest.server.KsqlRestConfig; +import io.vertx.core.Vertx; +import io.vertx.core.http.HttpClient; +import io.vertx.core.http.HttpClientOptions; +import io.vertx.core.net.SocketAddress; +import java.util.List; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class InternalKsqlClientFactoryTest { + + @Mock + private Vertx vertx; + @Mock + private HttpClient httpClient; + @Captor + private ArgumentCaptor options; + + @Before + public void setUp() { + when(vertx.createHttpClient(options.capture())).thenReturn(httpClient); + } + + @Test + public void shouldCreateClient() { + // When: + KsqlClient client = InternalKsqlClientFactory.createInternalClient( + ImmutableMap.of(), SocketAddress::inetSocketAddress, + vertx); + + // Then: + assertThat(client, notNullValue()); + } + + @Test + public void shouldCreateClient_http2() { + // When: + KsqlClient client = InternalKsqlClientFactory.createInternalClient( + ImmutableMap.of( + KsqlRestConfig.KSQL_INTERNAL_HTTP2_MAX_POOL_SIZE_CONFIG, "1234" + ), SocketAddress::inetSocketAddress, + vertx); + List optionsList = options.getAllValues(); + + + // Then: + assertThat(client, notNullValue()); + assertThat(optionsList.size(), is(4)); + HttpClientOptions sslOptions = optionsList.get(2); + assertThat(sslOptions.getHttp2MaxPoolSize(), is(1234)); + } + + @Test + public void shouldCreateClient_http2_badPoolSize() { + // When: + KsqlClient client = InternalKsqlClientFactory.createInternalClient( + ImmutableMap.of( + KsqlRestConfig.KSQL_INTERNAL_HTTP2_MAX_POOL_SIZE_CONFIG, "abc" + ), SocketAddress::inetSocketAddress, + vertx); + List optionsList = options.getAllValues(); + + + // Then: + assertThat(client, notNullValue()); + assertThat(optionsList.size(), is(4)); + HttpClientOptions sslOptions = optionsList.get(2); + assertThat(sslOptions.getHttp2MaxPoolSize(), + is(KsqlRestConfig.KSQL_INTERNAL_HTTP2_MAX_POOL_SIZE_DEFAULT)); + } +}