Skip to content

Commit

Permalink
feat: support custom request headers from java client and migrations …
Browse files Browse the repository at this point in the history
…tool (#8787)
  • Loading branch information
vcrfxia authored Feb 22, 2022
1 parent b53c1de commit ffe57f5
Show file tree
Hide file tree
Showing 13 changed files with 226 additions and 48 deletions.
24 changes: 23 additions & 1 deletion docs/operate-and-deploy/migrations-tool.md
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ ksql-migrations {-c | --config-file} <config-file> apply
[ {-u | --until} <untilVersion> ]
[ {-v | --version} <version> ]
[ {-d | --define} <variableName>=<variableValue> ]
[ --headers <headersFile> ]
[ --dry-run ]
```

Expand All @@ -241,7 +242,8 @@ to apply:
In addition to selecting a mode for `ksql-migrations apply`, you must also provide
the path to the config file of your migrations project as part of the command.

If both your ksqlDB server and migration tool are version 0.18 and newer, you can define variables by passing the `--define` flag followed by a string of the form
If both your ksqlDB server and migration tool are version 0.18 and newer, you can
define variables by passing the `--define` flag followed by a string of the form
`name=value` any number of times. For example, the following command

```bash
Expand Down Expand Up @@ -307,6 +309,26 @@ containing multiple ksqlDB statements fails during the migration, it's possible
some of the statements will have been run on the ksqlDB server while later statements
have not.

You can optionally pass custom request headers to be sent with all ksqlDB requests
made as part of the `apply` command by passing the location of a
file containing the custom request headers with the `--headers` flag:

```
$ ksql-migrations --config-file /my/migrations/project/ksql-migrations.properties apply --next --headers /my/migrations/project/request_headers.txt
```

Format your headers file with one header name and value pair on each line,
separated either with a colon or an equals sign. Both of the following are valid:
```
X-My-Custom-Header: abcdefg
X-My-Other-Custom-Header: asdfgh
```
or
```
X-My-Custom-Header=abcdefg
X-My-Other-Custom-Header=asdfgh
```

View Current Migration Status
-----------------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.confluent.ksql.api.client;

import io.confluent.ksql.api.client.impl.ClientOptionsImpl;
import java.util.Map;

/**
* Options for the ksqlDB {@link Client}.
Expand Down Expand Up @@ -143,6 +144,23 @@ public interface ClientOptions {
*/
ClientOptions setHttp2MultiplexingLimit(int http2MultiplexingLimit);

/**
* Sets custom request headers to be sent with requests to the ksqlDB server.
* These headers are in addition to any automatic headers such as the
* authorization header.
*
* <p>If this method is called more than once, only the headers passed on
* the last invocation will be used. To update existing custom headers,
* use this method in combination with {@link ClientOptions#getRequestHeaders()}.
*
* <p>In case of overlap between these custom headers and automatic headers such
* as the authorization header, these custom headers take precedence.
*
* @param requestHeaders custom request headers
* @return a reference to this
*/
ClientOptions setRequestHeaders(Map<String, String> requestHeaders);

/**
* Returns the host name of the ksqlDB server to connect to.
*
Expand Down Expand Up @@ -255,6 +273,14 @@ public interface ClientOptions {
*/
int getHttp2MultiplexingLimit();

/**
* Returns a copy of the custom request headers to be sent with ksqlDB requests.
* If not set, then this method returns an empty map.
*
* @return custom request headers
*/
Map<String, String> getRequestHeaders();

/**
* Creates a copy of these {@code ClientOptions}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -556,6 +557,11 @@ private <T extends CompletableFuture<?>> void makeRequest(
if (clientOptions.isUseBasicAuth()) {
request = configureBasicAuth(request);
}
if (clientOptions.getRequestHeaders() != null) {
for (final Entry<String, String> entry : clientOptions.getRequestHeaders().entrySet()) {
request.putHeader(entry.getKey(), entry.getValue());
}
}
if (endRequest) {
request.end(requestBody);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@

package io.confluent.ksql.api.client.impl;

import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.api.client.ClientOptions;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

public class ClientOptionsImpl implements ClientOptions {
Expand All @@ -36,6 +39,7 @@ public class ClientOptionsImpl implements ClientOptions {
private String basicAuthPassword;
private int executeQueryMaxResultRows = ClientOptions.DEFAULT_EXECUTE_QUERY_MAX_RESULT_ROWS;
private int http2MultiplexingLimit = ClientOptions.DEFAULT_HTTP2_MULTIPLEXING_LIMIT;
private Map<String, String> requestHeaders;

/**
* {@code ClientOptions} should be instantiated via {@link ClientOptions#create}, NOT via this
Expand All @@ -53,7 +57,8 @@ private ClientOptionsImpl(
final String trustStorePath, final String trustStorePassword,
final String keyStorePath, final String keyStorePassword, final String keyPassword,
final String keyAlias, final String basicAuthUsername, final String basicAuthPassword,
final int executeQueryMaxResultRows, final int http2MultiplexingLimit) {
final int executeQueryMaxResultRows, final int http2MultiplexingLimit,
final Map<String, String> requestHeaders) {
this.host = Objects.requireNonNull(host);
this.port = port;
this.useTls = useTls;
Expand All @@ -70,6 +75,7 @@ private ClientOptionsImpl(
this.basicAuthPassword = basicAuthPassword;
this.executeQueryMaxResultRows = executeQueryMaxResultRows;
this.http2MultiplexingLimit = http2MultiplexingLimit;
this.requestHeaders = requestHeaders;
}

@Override
Expand Down Expand Up @@ -158,6 +164,12 @@ public ClientOptions setHttp2MultiplexingLimit(final int http2MultiplexingLimit)
return this;
}

@Override
public ClientOptions setRequestHeaders(final Map<String, String> requestHeaders) {
this.requestHeaders = requestHeaders == null ? null : ImmutableMap.copyOf(requestHeaders);
return this;
}

@Override
public String getHost() {
return host == null ? "" : host;
Expand Down Expand Up @@ -238,6 +250,11 @@ public int getHttp2MultiplexingLimit() {
return http2MultiplexingLimit;
}

@Override
public Map<String, String> getRequestHeaders() {
return requestHeaders == null ? new HashMap<>() : new HashMap<>(requestHeaders);
}

@Override
public ClientOptions copy() {
return new ClientOptionsImpl(
Expand All @@ -247,7 +264,8 @@ public ClientOptions copy() {
trustStorePath, trustStorePassword,
keyStorePath, keyStorePassword, keyPassword, keyAlias,
basicAuthUsername, basicAuthPassword,
executeQueryMaxResultRows, http2MultiplexingLimit);
executeQueryMaxResultRows, http2MultiplexingLimit,
requestHeaders);
}

// CHECKSTYLE_RULES.OFF: CyclomaticComplexity
Expand Down Expand Up @@ -275,14 +293,16 @@ public boolean equals(final Object o) {
&& Objects.equals(keyAlias, that.keyAlias)
&& Objects.equals(basicAuthUsername, that.basicAuthUsername)
&& Objects.equals(basicAuthPassword, that.basicAuthPassword)
&& http2MultiplexingLimit == that.http2MultiplexingLimit;
&& http2MultiplexingLimit == that.http2MultiplexingLimit
&& Objects.equals(requestHeaders, that.requestHeaders);
}

@Override
public int hashCode() {
return Objects.hash(host, port, useTls, verifyHost, useAlpn, trustStorePath,
trustStorePassword, keyStorePath, keyStorePassword, keyPassword, keyAlias,
basicAuthUsername, basicAuthPassword, executeQueryMaxResultRows, http2MultiplexingLimit);
basicAuthUsername, basicAuthPassword, executeQueryMaxResultRows, http2MultiplexingLimit,
requestHeaders);
}

@Override
Expand All @@ -301,8 +321,9 @@ public String toString() {
+ ", keyAlias='" + keyAlias + '\''
+ ", basicAuthUsername='" + basicAuthUsername + '\''
+ ", basicAuthPassword='" + basicAuthPassword + '\''
+ ", executeQueryMaxResultRows=" + executeQueryMaxResultRows + '\''
+ ", executeQueryMaxResultRows=" + executeQueryMaxResultRows
+ ", http2MultiplexingLimit=" + http2MultiplexingLimit
+ ", requestHeaders='" + requestHeaders + '\''
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,7 @@ protected KsqlRestConfig createServerConfig() {

@Override
protected ClientOptions createJavaClientOptions() {
return ClientOptions.create()
.setHost("localhost")
.setPort(server.getListeners().get(0).getPort())
return super.createJavaClientOptions()
.setBasicAuthCredentials(USER_WITH_ACCESS, USER_WITH_ACCESS_PWD);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import io.confluent.ksql.api.client.exception.KsqlClientException;
import io.confluent.ksql.api.client.exception.KsqlException;
import io.confluent.ksql.api.client.impl.ConnectorTypeImpl;
import io.confluent.ksql.api.client.impl.ExecuteStatementResultImpl;
import io.confluent.ksql.api.client.impl.StreamedQueryResultImpl;
import io.confluent.ksql.api.client.util.ClientTestUtil;
import io.confluent.ksql.api.client.util.ClientTestUtil.TestSubscriber;
Expand Down Expand Up @@ -100,6 +99,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
Expand All @@ -108,6 +108,9 @@
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo.ConnectorState;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.TypeSafeDiagnosingMatcher;
import org.junit.Test;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
Expand Down Expand Up @@ -139,6 +142,8 @@ public class ClientTest extends BaseApiTest {
protected static final org.apache.kafka.connect.runtime.rest.entities.ConnectorType SOURCE_TYPE =
org.apache.kafka.connect.runtime.rest.entities.ConnectorType.SOURCE;

protected static final Map<String, String> REQUEST_HEADERS = ImmutableMap.of("h1", "v1", "h2", "v2");

protected Client javaClient;

@Override
Expand Down Expand Up @@ -1761,14 +1766,40 @@ public void setSessionVariablesWithHttpRequest() throws Exception {
assertThat(testEndpoints.getLastProperties(), is(new JsonObject().put("auto.offset.reset", "earliest")));
}

@Test
public void shouldSendCustomRequestHeaders() throws Exception {
// Given:
final CommandStatusEntity entity = new CommandStatusEntity(
"CSAS;",
new CommandId("STREAM", "FOO", "CREATE"),
new CommandStatus(
CommandStatus.Status.SUCCESS,
"Success"
),
0L
);
testEndpoints.setKsqlEndpointResponse(Collections.singletonList(entity));

// When:
javaClient.executeStatement("CSAS;").get();

// Then:
final List<Entry<String, String>> requestHeaders =
testEndpoints.getLastApiSecurityContext().getRequestHeaders();
for (final Entry<String, String> header : REQUEST_HEADERS.entrySet()) {
assertThat(requestHeaders, hasItems(entry(header)));
}
}

protected Client createJavaClient() {
return Client.create(createJavaClientOptions(), vertx);
}

protected ClientOptions createJavaClientOptions() {
return ClientOptions.create()
.setHost("localhost")
.setPort(server.getListeners().get(0).getPort());
.setPort(server.getListeners().get(0).getPort())
.setRequestHeaders(REQUEST_HEADERS);
}

private void verifyPushQueryServerState(final String sql) {
Expand Down Expand Up @@ -2069,4 +2100,30 @@ public LegacySourceDescriptionEntity(
this.warnings = warnings;
}
}

private static Matcher<? super Entry<String, String>> entry(
final Entry<String, String> entry
) {
return new TypeSafeDiagnosingMatcher<Entry<String, String>>() {
@Override
protected boolean matchesSafely(
final Entry<String, String> actual,
final Description mismatchDescription) {
if (!entry.getKey().equals(actual.getKey())) {
return false;
}
if (!entry.getValue().equals(actual.getValue())) {
return false;
}
return true;
}

@Override
public void describeTo(final Description description) {
description.appendText(String.format(
"key: %s. value: %s",
entry.getKey(), entry.getValue()));
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,7 @@ protected KsqlRestConfig createServerConfig() {

@Override
protected ClientOptions createJavaClientOptions() {
return ClientOptions.create()
.setHost("localhost")
.setPort(server.getListeners().get(0).getPort())
return super.createJavaClientOptions()
.setUseTls(true)
.setTrustStore(TRUST_STORE_PATH)
.setTrustStorePassword(TRUST_STORE_PASSWORD)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.google.common.testing.EqualsTester;
import io.confluent.ksql.api.client.ClientOptions;
import java.util.Collections;
import org.junit.Test;

public class ClientOptionsImplTest {
Expand Down Expand Up @@ -66,6 +67,9 @@ public void shouldImplementHashCodeAndEquals() {
.addEqualityGroup(
ClientOptions.create().setHttp2MultiplexingLimit(5)
)
.addEqualityGroup(
ClientOptions.create().setRequestHeaders(Collections.singletonMap("h1", "v1"))
)
.testEquals();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,8 @@ private static String getServiceId(final Map<String, String> configs) throws Mig
configs.get(SSL_KEY_PASSWORD),
configs.get(SSL_KEY_ALIAS),
configs.getOrDefault(SSL_ALPN, "false").equalsIgnoreCase("true"),
configs.getOrDefault(SSL_VERIFY_HOST, "true").equalsIgnoreCase("true")
configs.getOrDefault(SSL_VERIFY_HOST, "true").equalsIgnoreCase("true"),
null
);
final String serviceId;
try {
Expand Down
Loading

0 comments on commit ffe57f5

Please sign in to comment.