Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Prevents internal http2 requests from having shared connections closed #8507

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,12 @@ public class KsqlRestConfig extends AbstractConfig {
+ " otherwise be verbose. Note that this works on the entire URI, respecting the "
+ KSQL_ENDPOINT_LOGGING_LOG_QUERIES_CONFIG + " configuration";

public static final String KSQL_INTERNAL_HTTP2_MAX_POOL_SIZE_CONFIG
= "ksql.internal.http2.max.pool.size";
public static final int KSQL_INTERNAL_HTTP2_MAX_POOL_SIZE_DEFAULT = 3000;
public static final String KSQL_INTERNAL_HTTP2_MAX_POOL_SIZE_DOC =
"The maximum connection pool size used by Vertx for http2 internal connections";

private static final ConfigDef CONFIG_DEF;

static {
Expand Down Expand Up @@ -722,6 +728,12 @@ public class KsqlRestConfig extends AbstractConfig {
KSQL_ENDPOINT_LOGGING_LOG_QUERIES_DEFAULT,
Importance.LOW,
KSQL_ENDPOINT_LOGGING_LOG_QUERIES_DOC
).define(
KSQL_INTERNAL_HTTP2_MAX_POOL_SIZE_CONFIG,
Type.INT,
KSQL_INTERNAL_HTTP2_MAX_POOL_SIZE_DEFAULT,
Importance.LOW,
KSQL_INTERNAL_HTTP2_MAX_POOL_SIZE_DOC
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,11 @@
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class InternalKsqlClientFactory {
private static final Logger LOG = LoggerFactory.getLogger(InternalKsqlClientFactory.class);

private InternalKsqlClientFactory() {

Expand Down Expand Up @@ -59,9 +62,9 @@ public static KsqlClient createInternalClient(

private static Function<Boolean, HttpClientOptions> httpOptionsFactory(
final Map<String, String> clientProps, final boolean verifyHost,
final Function<Boolean, HttpClientOptions> clientOptions) {
final BiFunction<Map<String, String>, Boolean, HttpClientOptions> clientOptions) {
return (tls) -> {
final HttpClientOptions httpClientOptions = clientOptions.apply(tls);
final HttpClientOptions httpClientOptions = clientOptions.apply(clientProps, tls);
if (!tls) {
return httpClientOptions;
}
Expand All @@ -88,12 +91,38 @@ private static Function<Boolean, HttpClientOptions> httpOptionsFactory(
};
}

private static HttpClientOptions createClientOptions(final boolean tls) {
private static HttpClientOptions createClientOptions(
final Map<String, String> clientProps,
final boolean tls
) {
return new HttpClientOptions().setMaxPoolSize(100);
}

private static HttpClientOptions createClientOptionsHttp2(final boolean tls) {
return new HttpClientOptions().setHttp2MaxPoolSize(100).setProtocolVersion(HttpVersion.HTTP_2)
private static HttpClientOptions createClientOptionsHttp2(
final Map<String, String> clientProps,
final boolean tls
) {
final String size = clientProps.get(
KsqlRestConfig.KSQL_INTERNAL_HTTP2_MAX_POOL_SIZE_CONFIG);
int sizeInt;
if (size != null) {
try {
sizeInt = Integer.parseInt(size);
} catch (NumberFormatException e) {
LOG.error("Bad int passed in for " + KsqlRestConfig.KSQL_INTERNAL_HTTP2_MAX_POOL_SIZE_CONFIG
+ ", using " + KsqlRestConfig.KSQL_INTERNAL_HTTP2_MAX_POOL_SIZE_DEFAULT, e);
sizeInt = KsqlRestConfig.KSQL_INTERNAL_HTTP2_MAX_POOL_SIZE_DEFAULT;
}
} else {
sizeInt = KsqlRestConfig.KSQL_INTERNAL_HTTP2_MAX_POOL_SIZE_DEFAULT;
}
return new HttpClientOptions()
// At the moment, we cannot asynchronously end long-running queries in http2, in a way that
// we can with http1.1, by just closing the connection. For that reason, we've disabled
// multiplexing: https://github.com/confluentinc/ksql/issues/8505
.setHttp2MultiplexingLimit(1)
.setHttp2MaxPoolSize(sizeInt)
.setProtocolVersion(HttpVersion.HTTP_2)
.setUseAlpn(tls);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package io.confluent.ksql.rest.server.services;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.rest.client.KsqlClient;
import io.confluent.ksql.rest.server.KsqlRestConfig;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.net.SocketAddress;
import java.util.List;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
public class InternalKsqlClientFactoryTest {

@Mock
private Vertx vertx;
@Mock
private HttpClient httpClient;
@Captor
private ArgumentCaptor<HttpClientOptions> options;

@Before
public void setUp() {
when(vertx.createHttpClient(options.capture())).thenReturn(httpClient);
}

@Test
public void shouldCreateClient() {
// When:
KsqlClient client = InternalKsqlClientFactory.createInternalClient(
ImmutableMap.of(), SocketAddress::inetSocketAddress,
vertx);

// Then:
assertThat(client, notNullValue());
}

@Test
public void shouldCreateClient_http2() {
// When:
KsqlClient client = InternalKsqlClientFactory.createInternalClient(
ImmutableMap.of(
KsqlRestConfig.KSQL_INTERNAL_HTTP2_MAX_POOL_SIZE_CONFIG, "1234"
), SocketAddress::inetSocketAddress,
vertx);
List<HttpClientOptions> optionsList = options.getAllValues();


// Then:
assertThat(client, notNullValue());
assertThat(optionsList.size(), is(4));
HttpClientOptions sslOptions = optionsList.get(2);
assertThat(sslOptions.getHttp2MaxPoolSize(), is(1234));
}

@Test
public void shouldCreateClient_http2_badPoolSize() {
// When:
KsqlClient client = InternalKsqlClientFactory.createInternalClient(
ImmutableMap.of(
KsqlRestConfig.KSQL_INTERNAL_HTTP2_MAX_POOL_SIZE_CONFIG, "abc"
), SocketAddress::inetSocketAddress,
vertx);
List<HttpClientOptions> optionsList = options.getAllValues();


// Then:
assertThat(client, notNullValue());
assertThat(optionsList.size(), is(4));
HttpClientOptions sslOptions = optionsList.get(2);
assertThat(sslOptions.getHttp2MaxPoolSize(),
is(KsqlRestConfig.KSQL_INTERNAL_HTTP2_MAX_POOL_SIZE_DEFAULT));
}
}