Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support custom request headers from java client and migrations tool #8787

Merged
merged 6 commits into from
Feb 22, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion docs/operate-and-deploy/migrations-tool.md
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ ksql-migrations {-c | --config-file} <config-file> apply
[ {-u | --until} <untilVersion> ]
[ {-v | --version} <version> ]
[ {-d | --define} <variableName>=<variableValue> ]
[ --headers <headersFile> ]
[ --dry-run ]
```

Expand All @@ -241,7 +242,8 @@ to apply:
In addition to selecting a mode for `ksql-migrations apply`, you must also provide
the path to the config file of your migrations project as part of the command.

If both your ksqlDB server and migration tool are version 0.18 and newer, you can define variables by passing the `--define` flag followed by a string of the form
If both your ksqlDB server and migration tool are version 0.18 and newer, you can
define variables by passing the `--define` flag followed by a string of the form
`name=value` any number of times. For example, the following command

```bash
Expand Down Expand Up @@ -307,6 +309,26 @@ containing multiple ksqlDB statements fails during the migration, it's possible
some of the statements will have been run on the ksqlDB server while later statements
have not.

You can optionally pass custom request headers to be sent with all ksqlDB requests
made as part of the `apply` command by passing the location of a
file containing the custom request headers with the `--headers` flag:

```
$ ksql-migrations --config-file /my/migrations/project/ksql-migrations.properties apply --next --headers /my/migrations/project/request_headers.txt
```

Format your headers file with one header name and value pair on each line,
separated either with a colon or an equals sign. Both of the following are valid:
```
X-My-Custom-Header: abcdefg
X-My-Other-Custom-Header: asdfgh
```
or
```
X-My-Custom-Header=abcdefg
X-My-Other-Custom-Header=asdfgh
```

View Current Migration Status
-----------------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.confluent.ksql.api.client;

import io.confluent.ksql.api.client.impl.ClientOptionsImpl;
import java.util.Map;

/**
* Options for the ksqlDB {@link Client}.
Expand Down Expand Up @@ -143,6 +144,23 @@ public interface ClientOptions {
*/
ClientOptions setHttp2MultiplexingLimit(int http2MultiplexingLimit);

/**
* Sets custom request headers to be sent with requests to the ksqlDB server.
* These headers are in addition to any automatic headers such as the
* authorization header.
*
* <p>If this method is called more than once, only the headers passed on
* the last invocation will be used. To update existing custom headers,
* use this method in combination with {@link ClientOptions#getRequestHeaders()}.
*
* <p>In case of overlap between these custom headers and automatic headers such
* as the authorization header, these custom headers take precedence.
*
* @param requestHeaders custom request headers
* @return a reference to this
*/
ClientOptions setRequestHeaders(Map<String, String> requestHeaders);

/**
* Returns the host name of the ksqlDB server to connect to.
*
Expand Down Expand Up @@ -255,6 +273,14 @@ public interface ClientOptions {
*/
int getHttp2MultiplexingLimit();

/**
* Returns a copy of the custom request headers to be sent with ksqlDB requests.
* If not set, then this method returns {@code null}.
*
* @return custom request headers
*/
Map<String, String> getRequestHeaders();

/**
* Creates a copy of these {@code ClientOptions}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -556,6 +557,11 @@ private <T extends CompletableFuture<?>> void makeRequest(
if (clientOptions.isUseBasicAuth()) {
request = configureBasicAuth(request);
}
if (clientOptions.getRequestHeaders() != null) {
for (final Entry<String, String> entry : clientOptions.getRequestHeaders().entrySet()) {
request.putHeader(entry.getKey(), entry.getValue());
}
}
if (endRequest) {
request.end(requestBody);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package io.confluent.ksql.api.client.impl;

import io.confluent.ksql.api.client.ClientOptions;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

public class ClientOptionsImpl implements ClientOptions {
Expand All @@ -36,6 +38,7 @@ public class ClientOptionsImpl implements ClientOptions {
private String basicAuthPassword;
private int executeQueryMaxResultRows = ClientOptions.DEFAULT_EXECUTE_QUERY_MAX_RESULT_ROWS;
private int http2MultiplexingLimit = ClientOptions.DEFAULT_HTTP2_MULTIPLEXING_LIMIT;
private Map<String, String> requestHeaders;

/**
* {@code ClientOptions} should be instantiated via {@link ClientOptions#create}, NOT via this
Expand All @@ -53,7 +56,8 @@ private ClientOptionsImpl(
final String trustStorePath, final String trustStorePassword,
final String keyStorePath, final String keyStorePassword, final String keyPassword,
final String keyAlias, final String basicAuthUsername, final String basicAuthPassword,
final int executeQueryMaxResultRows, final int http2MultiplexingLimit) {
final int executeQueryMaxResultRows, final int http2MultiplexingLimit,
final Map<String, String> requestHeaders) {
this.host = Objects.requireNonNull(host);
this.port = port;
this.useTls = useTls;
Expand All @@ -70,6 +74,7 @@ private ClientOptionsImpl(
this.basicAuthPassword = basicAuthPassword;
this.executeQueryMaxResultRows = executeQueryMaxResultRows;
this.http2MultiplexingLimit = http2MultiplexingLimit;
this.requestHeaders = requestHeaders;
}

@Override
Expand Down Expand Up @@ -158,6 +163,12 @@ public ClientOptions setHttp2MultiplexingLimit(final int http2MultiplexingLimit)
return this;
}

@Override
public ClientOptions setRequestHeaders(final Map<String, String> requestHeaders) {
this.requestHeaders = requestHeaders;
return this;
}

@Override
public String getHost() {
return host == null ? "" : host;
Expand Down Expand Up @@ -238,6 +249,11 @@ public int getHttp2MultiplexingLimit() {
return http2MultiplexingLimit;
}

@Override
public Map<String, String> getRequestHeaders() {
return requestHeaders == null ? null : new HashMap<>(requestHeaders);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better to return an empty map?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't feel strongly, though it is a public API so we should decide now. (The javadoc I added says it returns null if it was never set but that can easily be changed.) Would you expect an empty map? I can make that change in order to be consistent with the String fields returning the empty string if not set, rather than null.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... I tried to look for other similar API instances briefly.

I did this: https://github.com/confluentinc/ksql/blob/master/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/HttpRequestImpl.java#L29-L30 where empty maps are used rather than null values. I think that's a slight node to changing to empty maps.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, updated.

}

@Override
public ClientOptions copy() {
return new ClientOptionsImpl(
Expand All @@ -247,7 +263,8 @@ public ClientOptions copy() {
trustStorePath, trustStorePassword,
keyStorePath, keyStorePassword, keyPassword, keyAlias,
basicAuthUsername, basicAuthPassword,
executeQueryMaxResultRows, http2MultiplexingLimit);
executeQueryMaxResultRows, http2MultiplexingLimit,
requestHeaders);
}

// CHECKSTYLE_RULES.OFF: CyclomaticComplexity
Expand Down Expand Up @@ -275,14 +292,16 @@ public boolean equals(final Object o) {
&& Objects.equals(keyAlias, that.keyAlias)
&& Objects.equals(basicAuthUsername, that.basicAuthUsername)
&& Objects.equals(basicAuthPassword, that.basicAuthPassword)
&& http2MultiplexingLimit == that.http2MultiplexingLimit;
&& http2MultiplexingLimit == that.http2MultiplexingLimit
&& Objects.equals(requestHeaders, that.requestHeaders);
}

@Override
public int hashCode() {
return Objects.hash(host, port, useTls, verifyHost, useAlpn, trustStorePath,
trustStorePassword, keyStorePath, keyStorePassword, keyPassword, keyAlias,
basicAuthUsername, basicAuthPassword, executeQueryMaxResultRows, http2MultiplexingLimit);
basicAuthUsername, basicAuthPassword, executeQueryMaxResultRows, http2MultiplexingLimit,
requestHeaders);
}

@Override
Expand All @@ -301,8 +320,9 @@ public String toString() {
+ ", keyAlias='" + keyAlias + '\''
+ ", basicAuthUsername='" + basicAuthUsername + '\''
+ ", basicAuthPassword='" + basicAuthPassword + '\''
+ ", executeQueryMaxResultRows=" + executeQueryMaxResultRows + '\''
+ ", executeQueryMaxResultRows=" + executeQueryMaxResultRows
+ ", http2MultiplexingLimit=" + http2MultiplexingLimit
+ ", requestHeaders='" + requestHeaders + '\''
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Random question, have you seen this output? (Asking in case it'd look goofy or not print things.)

Separately, are any of the headers potentially sensitive / something that should not be printed / logged?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The headers are whatever users set (they're completely custom), so in theory they could be sensitive but there's no way for them to specify that as such. Do you think it's better to not log the headers then (or at least the values)? I'm not sure when/if this would actually ever get printed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there's not already some strong guidance or precedent here, we are probably fine.

+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,7 @@ protected KsqlRestConfig createServerConfig() {

@Override
protected ClientOptions createJavaClientOptions() {
return ClientOptions.create()
.setHost("localhost")
.setPort(server.getListeners().get(0).getPort())
return super.createJavaClientOptions()
.setBasicAuthCredentials(USER_WITH_ACCESS, USER_WITH_ACCESS_PWD);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import io.confluent.ksql.api.client.exception.KsqlClientException;
import io.confluent.ksql.api.client.exception.KsqlException;
import io.confluent.ksql.api.client.impl.ConnectorTypeImpl;
import io.confluent.ksql.api.client.impl.ExecuteStatementResultImpl;
import io.confluent.ksql.api.client.impl.StreamedQueryResultImpl;
import io.confluent.ksql.api.client.util.ClientTestUtil;
import io.confluent.ksql.api.client.util.ClientTestUtil.TestSubscriber;
Expand Down Expand Up @@ -100,6 +99,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
Expand All @@ -108,6 +108,9 @@
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo.ConnectorState;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.TypeSafeDiagnosingMatcher;
import org.junit.Test;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
Expand Down Expand Up @@ -139,6 +142,8 @@ public class ClientTest extends BaseApiTest {
protected static final org.apache.kafka.connect.runtime.rest.entities.ConnectorType SOURCE_TYPE =
org.apache.kafka.connect.runtime.rest.entities.ConnectorType.SOURCE;

protected static final Map<String, String> REQUEST_HEADERS = ImmutableMap.of("h1", "v1", "h2", "v2");

protected Client javaClient;

@Override
Expand Down Expand Up @@ -1761,14 +1766,40 @@ public void setSessionVariablesWithHttpRequest() throws Exception {
assertThat(testEndpoints.getLastProperties(), is(new JsonObject().put("auto.offset.reset", "earliest")));
}

@Test
public void shouldSendCustomRequestHeaders() throws Exception {
// Given:
final CommandStatusEntity entity = new CommandStatusEntity(
"CSAS;",
new CommandId("STREAM", "FOO", "CREATE"),
new CommandStatus(
CommandStatus.Status.SUCCESS,
"Success"
),
0L
);
testEndpoints.setKsqlEndpointResponse(Collections.singletonList(entity));

// When:
javaClient.executeStatement("CSAS;").get();

// Then:
final List<Entry<String, String>> requestHeaders =
testEndpoints.getLastApiSecurityContext().getRequestHeaders();
for (final Entry<String, String> header : REQUEST_HEADERS.entrySet()) {
assertThat(requestHeaders, hasItems(entry(header)));
}
}

protected Client createJavaClient() {
return Client.create(createJavaClientOptions(), vertx);
}

protected ClientOptions createJavaClientOptions() {
return ClientOptions.create()
.setHost("localhost")
.setPort(server.getListeners().get(0).getPort());
.setPort(server.getListeners().get(0).getPort())
.setRequestHeaders(REQUEST_HEADERS);
}

private void verifyPushQueryServerState(final String sql) {
Expand Down Expand Up @@ -2069,4 +2100,30 @@ public LegacySourceDescriptionEntity(
this.warnings = warnings;
}
}

private static Matcher<? super Entry<String, String>> entry(
final Entry<String, String> entry
) {
return new TypeSafeDiagnosingMatcher<Entry<String, String>>() {
@Override
protected boolean matchesSafely(
final Entry<String, String> actual,
final Description mismatchDescription) {
if (!entry.getKey().equals(actual.getKey())) {
return false;
}
if (!entry.getValue().equals(actual.getValue())) {
return false;
}
return true;
}

@Override
public void describeTo(final Description description) {
description.appendText(String.format(
"key: %s. value: %s",
entry.getKey(), entry.getValue()));
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,7 @@ protected KsqlRestConfig createServerConfig() {

@Override
protected ClientOptions createJavaClientOptions() {
return ClientOptions.create()
.setHost("localhost")
.setPort(server.getListeners().get(0).getPort())
return super.createJavaClientOptions()
.setUseTls(true)
.setTrustStore(TRUST_STORE_PATH)
.setTrustStorePassword(TRUST_STORE_PASSWORD)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.google.common.testing.EqualsTester;
import io.confluent.ksql.api.client.ClientOptions;
import java.util.Collections;
import org.junit.Test;

public class ClientOptionsImplTest {
Expand Down Expand Up @@ -66,6 +67,9 @@ public void shouldImplementHashCodeAndEquals() {
.addEqualityGroup(
ClientOptions.create().setHttp2MultiplexingLimit(5)
)
.addEqualityGroup(
ClientOptions.create().setRequestHeaders(Collections.singletonMap("h1", "v1"))
)
.testEquals();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,8 @@ private static String getServiceId(final Map<String, String> configs) throws Mig
configs.get(SSL_KEY_PASSWORD),
configs.get(SSL_KEY_ALIAS),
configs.getOrDefault(SSL_ALPN, "false").equalsIgnoreCase("true"),
configs.getOrDefault(SSL_VERIFY_HOST, "true").equalsIgnoreCase("true")
configs.getOrDefault(SSL_VERIFY_HOST, "true").equalsIgnoreCase("true"),
null
);
final String serviceId;
try {
Expand Down
Loading