Skip to content

Commit

Permalink
fix: Prevents internal http2 requests from having shared connections …
Browse files Browse the repository at this point in the history
…closed (#8507)

* fix: Prevents internal http2 requests from having shared connections closed
  • Loading branch information
AlanConfluent authored Dec 14, 2021
1 parent 0007228 commit 5b0be58
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,12 @@ public class KsqlRestConfig extends AbstractConfig {
+ " otherwise be verbose. Note that this works on the entire URI, respecting the "
+ KSQL_ENDPOINT_LOGGING_LOG_QUERIES_CONFIG + " configuration";

public static final String KSQL_INTERNAL_HTTP2_MAX_POOL_SIZE_CONFIG
= "ksql.internal.http2.max.pool.size";
public static final int KSQL_INTERNAL_HTTP2_MAX_POOL_SIZE_DEFAULT = 3000;
public static final String KSQL_INTERNAL_HTTP2_MAX_POOL_SIZE_DOC =
"The maximum connection pool size used by Vertx for http2 internal connections";

private static final ConfigDef CONFIG_DEF;

static {
Expand Down Expand Up @@ -722,6 +728,12 @@ public class KsqlRestConfig extends AbstractConfig {
KSQL_ENDPOINT_LOGGING_LOG_QUERIES_DEFAULT,
Importance.LOW,
KSQL_ENDPOINT_LOGGING_LOG_QUERIES_DOC
).define(
KSQL_INTERNAL_HTTP2_MAX_POOL_SIZE_CONFIG,
Type.INT,
KSQL_INTERNAL_HTTP2_MAX_POOL_SIZE_DEFAULT,
Importance.LOW,
KSQL_INTERNAL_HTTP2_MAX_POOL_SIZE_DOC
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,11 @@
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class InternalKsqlClientFactory {
private static final Logger LOG = LoggerFactory.getLogger(InternalKsqlClientFactory.class);

private InternalKsqlClientFactory() {

Expand Down Expand Up @@ -59,9 +62,9 @@ public static KsqlClient createInternalClient(

private static Function<Boolean, HttpClientOptions> httpOptionsFactory(
final Map<String, String> clientProps, final boolean verifyHost,
final Function<Boolean, HttpClientOptions> clientOptions) {
final BiFunction<Map<String, String>, Boolean, HttpClientOptions> clientOptions) {
return (tls) -> {
final HttpClientOptions httpClientOptions = clientOptions.apply(tls);
final HttpClientOptions httpClientOptions = clientOptions.apply(clientProps, tls);
if (!tls) {
return httpClientOptions;
}
Expand All @@ -88,12 +91,38 @@ private static Function<Boolean, HttpClientOptions> httpOptionsFactory(
};
}

private static HttpClientOptions createClientOptions(final boolean tls) {
private static HttpClientOptions createClientOptions(
final Map<String, String> clientProps,
final boolean tls
) {
return new HttpClientOptions().setMaxPoolSize(100);
}

private static HttpClientOptions createClientOptionsHttp2(final boolean tls) {
return new HttpClientOptions().setHttp2MaxPoolSize(100).setProtocolVersion(HttpVersion.HTTP_2)
private static HttpClientOptions createClientOptionsHttp2(
final Map<String, String> clientProps,
final boolean tls
) {
final String size = clientProps.get(
KsqlRestConfig.KSQL_INTERNAL_HTTP2_MAX_POOL_SIZE_CONFIG);
int sizeInt;
if (size != null) {
try {
sizeInt = Integer.parseInt(size);
} catch (NumberFormatException e) {
LOG.error("Bad int passed in for " + KsqlRestConfig.KSQL_INTERNAL_HTTP2_MAX_POOL_SIZE_CONFIG
+ ", using " + KsqlRestConfig.KSQL_INTERNAL_HTTP2_MAX_POOL_SIZE_DEFAULT, e);
sizeInt = KsqlRestConfig.KSQL_INTERNAL_HTTP2_MAX_POOL_SIZE_DEFAULT;
}
} else {
sizeInt = KsqlRestConfig.KSQL_INTERNAL_HTTP2_MAX_POOL_SIZE_DEFAULT;
}
return new HttpClientOptions()
// At the moment, we cannot asynchronously end long-running queries in http2, in a way that
// we can with http1.1, by just closing the connection. For that reason, we've disabled
// multiplexing: https://github.com/confluentinc/ksql/issues/8505
.setHttp2MultiplexingLimit(1)
.setHttp2MaxPoolSize(sizeInt)
.setProtocolVersion(HttpVersion.HTTP_2)
.setUseAlpn(tls);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package io.confluent.ksql.rest.server.services;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.rest.client.KsqlClient;
import io.confluent.ksql.rest.server.KsqlRestConfig;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.net.SocketAddress;
import java.util.List;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
public class InternalKsqlClientFactoryTest {

@Mock
private Vertx vertx;
@Mock
private HttpClient httpClient;
@Captor
private ArgumentCaptor<HttpClientOptions> options;

@Before
public void setUp() {
when(vertx.createHttpClient(options.capture())).thenReturn(httpClient);
}

@Test
public void shouldCreateClient() {
// When:
KsqlClient client = InternalKsqlClientFactory.createInternalClient(
ImmutableMap.of(), SocketAddress::inetSocketAddress,
vertx);

// Then:
assertThat(client, notNullValue());
}

@Test
public void shouldCreateClient_http2() {
// When:
KsqlClient client = InternalKsqlClientFactory.createInternalClient(
ImmutableMap.of(
KsqlRestConfig.KSQL_INTERNAL_HTTP2_MAX_POOL_SIZE_CONFIG, "1234"
), SocketAddress::inetSocketAddress,
vertx);
List<HttpClientOptions> optionsList = options.getAllValues();


// Then:
assertThat(client, notNullValue());
assertThat(optionsList.size(), is(4));
HttpClientOptions sslOptions = optionsList.get(2);
assertThat(sslOptions.getHttp2MaxPoolSize(), is(1234));
}

@Test
public void shouldCreateClient_http2_badPoolSize() {
// When:
KsqlClient client = InternalKsqlClientFactory.createInternalClient(
ImmutableMap.of(
KsqlRestConfig.KSQL_INTERNAL_HTTP2_MAX_POOL_SIZE_CONFIG, "abc"
), SocketAddress::inetSocketAddress,
vertx);
List<HttpClientOptions> optionsList = options.getAllValues();


// Then:
assertThat(client, notNullValue());
assertThat(optionsList.size(), is(4));
HttpClientOptions sslOptions = optionsList.get(2);
assertThat(sslOptions.getHttp2MaxPoolSize(),
is(KsqlRestConfig.KSQL_INTERNAL_HTTP2_MAX_POOL_SIZE_DEFAULT));
}
}

0 comments on commit 5b0be58

Please sign in to comment.