From 58b32b2a065b5de9f51b4f6e2f942e1d36116731 Mon Sep 17 00:00:00 2001 From: Victoria Xia Date: Thu, 17 Feb 2022 09:35:33 -0800 Subject: [PATCH 1/6] feat: support custom request headers from java client --- .../ksql/api/client/ClientOptions.java | 26 ++++++++ .../ksql/api/client/impl/ClientImpl.java | 6 ++ .../api/client/impl/ClientOptionsImpl.java | 30 +++++++-- .../ksql/api/client/ClientBasicAuthTest.java | 4 +- .../confluent/ksql/api/client/ClientTest.java | 61 ++++++++++++++++++- .../ksql/api/client/ClientTlsTest.java | 4 +- .../client/impl/ClientOptionsImplTest.java | 4 ++ 7 files changed, 122 insertions(+), 13 deletions(-) diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/ClientOptions.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/ClientOptions.java index 59632a90bde5..31df5d854148 100644 --- a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/ClientOptions.java +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/ClientOptions.java @@ -16,6 +16,7 @@ package io.confluent.ksql.api.client; import io.confluent.ksql.api.client.impl.ClientOptionsImpl; +import java.util.Map; /** * Options for the ksqlDB {@link Client}. @@ -143,6 +144,23 @@ public interface ClientOptions { */ ClientOptions setHttp2MultiplexingLimit(int http2MultiplexingLimit); + /** + * Sets custom request headers to be sent with requests to the ksqlDB server. + * These headers are in addition to any automatic headers such as the + * authorization header. + * + *

If this method is called more than once, only the headers passed on + * the last invocation will be used. To update existing custom headers, + * use this method in combination with {@link ClientOptions#getRequestHeaders()}. + * + *

In case of overlap between these custom headers and automatic headers such + * as the authorization header, these custom headers take precedence. + * + * @param requestHeaders custom request headers + * @return a reference to this + */ + ClientOptions setRequestHeaders(Map requestHeaders); + /** * Returns the host name of the ksqlDB server to connect to. * @@ -255,6 +273,14 @@ public interface ClientOptions { */ int getHttp2MultiplexingLimit(); + /** + * Returns a copy of the custom request headers to be sent with ksqlDB requests. + * If not set, then this method returns {@code null}. + * + * @return custom request headers + */ + Map getRequestHeaders(); + /** * Creates a copy of these {@code ClientOptions}. * diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientImpl.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientImpl.java index 022f8184abdd..ae62cbcb921f 100644 --- a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientImpl.java +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientImpl.java @@ -60,6 +60,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -556,6 +557,11 @@ private > void makeRequest( if (clientOptions.isUseBasicAuth()) { request = configureBasicAuth(request); } + if (clientOptions.getRequestHeaders() != null) { + for (final Entry entry : clientOptions.getRequestHeaders().entrySet()) { + request.putHeader(entry.getKey(), entry.getValue()); + } + } if (endRequest) { request.end(requestBody); } else { diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientOptionsImpl.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientOptionsImpl.java index 7138ed483f33..49380292418b 100644 --- a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientOptionsImpl.java +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientOptionsImpl.java @@ -16,6 +16,8 @@ package io.confluent.ksql.api.client.impl; import io.confluent.ksql.api.client.ClientOptions; +import java.util.HashMap; +import java.util.Map; import java.util.Objects; public class ClientOptionsImpl implements ClientOptions { @@ -36,6 +38,7 @@ public class ClientOptionsImpl implements ClientOptions { private String basicAuthPassword; private int executeQueryMaxResultRows = ClientOptions.DEFAULT_EXECUTE_QUERY_MAX_RESULT_ROWS; private int http2MultiplexingLimit = ClientOptions.DEFAULT_HTTP2_MULTIPLEXING_LIMIT; + private Map requestHeaders; /** * {@code ClientOptions} should be instantiated via {@link ClientOptions#create}, NOT via this @@ -53,7 +56,8 @@ private ClientOptionsImpl( final String trustStorePath, final String trustStorePassword, final String keyStorePath, final String keyStorePassword, final String keyPassword, final String keyAlias, final String basicAuthUsername, final String basicAuthPassword, - final int executeQueryMaxResultRows, final int http2MultiplexingLimit) { + final int executeQueryMaxResultRows, final int http2MultiplexingLimit, + final Map requestHeaders) { this.host = Objects.requireNonNull(host); this.port = port; this.useTls = useTls; @@ -70,6 +74,7 @@ private ClientOptionsImpl( this.basicAuthPassword = basicAuthPassword; this.executeQueryMaxResultRows = executeQueryMaxResultRows; this.http2MultiplexingLimit = http2MultiplexingLimit; + this.requestHeaders = requestHeaders; } @Override @@ -158,6 +163,12 @@ public ClientOptions setHttp2MultiplexingLimit(final int http2MultiplexingLimit) return this; } + @Override + public ClientOptions setRequestHeaders(final Map requestHeaders) { + this.requestHeaders = requestHeaders; + return this; + } + @Override public String getHost() { return host == null ? "" : host; @@ -238,6 +249,11 @@ public int getHttp2MultiplexingLimit() { return http2MultiplexingLimit; } + @Override + public Map getRequestHeaders() { + return requestHeaders == null ? null : new HashMap<>(requestHeaders); + } + @Override public ClientOptions copy() { return new ClientOptionsImpl( @@ -247,7 +263,8 @@ public ClientOptions copy() { trustStorePath, trustStorePassword, keyStorePath, keyStorePassword, keyPassword, keyAlias, basicAuthUsername, basicAuthPassword, - executeQueryMaxResultRows, http2MultiplexingLimit); + executeQueryMaxResultRows, http2MultiplexingLimit, + requestHeaders); } // CHECKSTYLE_RULES.OFF: CyclomaticComplexity @@ -275,14 +292,16 @@ public boolean equals(final Object o) { && Objects.equals(keyAlias, that.keyAlias) && Objects.equals(basicAuthUsername, that.basicAuthUsername) && Objects.equals(basicAuthPassword, that.basicAuthPassword) - && http2MultiplexingLimit == that.http2MultiplexingLimit; + && http2MultiplexingLimit == that.http2MultiplexingLimit + && Objects.equals(requestHeaders, that.requestHeaders); } @Override public int hashCode() { return Objects.hash(host, port, useTls, verifyHost, useAlpn, trustStorePath, trustStorePassword, keyStorePath, keyStorePassword, keyPassword, keyAlias, - basicAuthUsername, basicAuthPassword, executeQueryMaxResultRows, http2MultiplexingLimit); + basicAuthUsername, basicAuthPassword, executeQueryMaxResultRows, http2MultiplexingLimit, + requestHeaders); } @Override @@ -301,8 +320,9 @@ public String toString() { + ", keyAlias='" + keyAlias + '\'' + ", basicAuthUsername='" + basicAuthUsername + '\'' + ", basicAuthPassword='" + basicAuthPassword + '\'' - + ", executeQueryMaxResultRows=" + executeQueryMaxResultRows + '\'' + + ", executeQueryMaxResultRows=" + executeQueryMaxResultRows + ", http2MultiplexingLimit=" + http2MultiplexingLimit + + ", requestHeaders='" + requestHeaders + '\'' + '}'; } } diff --git a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientBasicAuthTest.java b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientBasicAuthTest.java index 7670830d20c4..d6981c852c79 100644 --- a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientBasicAuthTest.java +++ b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientBasicAuthTest.java @@ -57,9 +57,7 @@ protected KsqlRestConfig createServerConfig() { @Override protected ClientOptions createJavaClientOptions() { - return ClientOptions.create() - .setHost("localhost") - .setPort(server.getListeners().get(0).getPort()) + return super.createJavaClientOptions() .setBasicAuthCredentials(USER_WITH_ACCESS, USER_WITH_ACCESS_PWD); } diff --git a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTest.java b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTest.java index 8545619eaed3..2862e68ebcd6 100644 --- a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTest.java +++ b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTest.java @@ -43,7 +43,6 @@ import io.confluent.ksql.api.client.exception.KsqlClientException; import io.confluent.ksql.api.client.exception.KsqlException; import io.confluent.ksql.api.client.impl.ConnectorTypeImpl; -import io.confluent.ksql.api.client.impl.ExecuteStatementResultImpl; import io.confluent.ksql.api.client.impl.StreamedQueryResultImpl; import io.confluent.ksql.api.client.util.ClientTestUtil; import io.confluent.ksql.api.client.util.ClientTestUtil.TestSubscriber; @@ -100,6 +99,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Optional; import java.util.Set; import java.util.concurrent.CountDownLatch; @@ -108,6 +108,9 @@ import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo.ConnectorState; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.TypeSafeDiagnosingMatcher; import org.junit.Test; import org.reactivestreams.Publisher; import org.slf4j.Logger; @@ -139,6 +142,8 @@ public class ClientTest extends BaseApiTest { protected static final org.apache.kafka.connect.runtime.rest.entities.ConnectorType SOURCE_TYPE = org.apache.kafka.connect.runtime.rest.entities.ConnectorType.SOURCE; + protected static final Map REQUEST_HEADERS = ImmutableMap.of("h1", "v1", "h2", "v2"); + protected Client javaClient; @Override @@ -1761,6 +1766,31 @@ public void setSessionVariablesWithHttpRequest() throws Exception { assertThat(testEndpoints.getLastProperties(), is(new JsonObject().put("auto.offset.reset", "earliest"))); } + @Test + public void shouldSendCustomRequestHeaders() throws Exception { + // Given: + final CommandStatusEntity entity = new CommandStatusEntity( + "CSAS;", + new CommandId("STREAM", "FOO", "CREATE"), + new CommandStatus( + CommandStatus.Status.SUCCESS, + "Success" + ), + 0L + ); + testEndpoints.setKsqlEndpointResponse(Collections.singletonList(entity)); + + // When: + javaClient.executeStatement("CSAS;").get(); + + // Then: + final List> requestHeaders = + testEndpoints.getLastApiSecurityContext().getRequestHeaders(); + for (final Entry header : REQUEST_HEADERS.entrySet()) { + assertThat(requestHeaders, hasItems(entry(header))); + } + } + protected Client createJavaClient() { return Client.create(createJavaClientOptions(), vertx); } @@ -1768,7 +1798,8 @@ protected Client createJavaClient() { protected ClientOptions createJavaClientOptions() { return ClientOptions.create() .setHost("localhost") - .setPort(server.getListeners().get(0).getPort()); + .setPort(server.getListeners().get(0).getPort()) + .setRequestHeaders(REQUEST_HEADERS); } private void verifyPushQueryServerState(final String sql) { @@ -2069,4 +2100,30 @@ public LegacySourceDescriptionEntity( this.warnings = warnings; } } + + private static Matcher> entry( + final Entry entry + ) { + return new TypeSafeDiagnosingMatcher>() { + @Override + protected boolean matchesSafely( + final Entry actual, + final Description mismatchDescription) { + if (!entry.getKey().equals(actual.getKey())) { + return false; + } + if (!entry.getValue().equals(actual.getValue())) { + return false; + } + return true; + } + + @Override + public void describeTo(final Description description) { + description.appendText(String.format( + "key: %s. value: %s", + entry.getKey(), entry.getValue())); + } + }; + } } \ No newline at end of file diff --git a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTlsTest.java b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTlsTest.java index 46e7139acb6f..ec69b0a7d370 100644 --- a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTlsTest.java +++ b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTlsTest.java @@ -59,9 +59,7 @@ protected KsqlRestConfig createServerConfig() { @Override protected ClientOptions createJavaClientOptions() { - return ClientOptions.create() - .setHost("localhost") - .setPort(server.getListeners().get(0).getPort()) + return super.createJavaClientOptions() .setUseTls(true) .setTrustStore(TRUST_STORE_PATH) .setTrustStorePassword(TRUST_STORE_PASSWORD) diff --git a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/impl/ClientOptionsImplTest.java b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/impl/ClientOptionsImplTest.java index 138a99609521..e110d2d5f8c3 100644 --- a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/impl/ClientOptionsImplTest.java +++ b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/impl/ClientOptionsImplTest.java @@ -17,6 +17,7 @@ import com.google.common.testing.EqualsTester; import io.confluent.ksql.api.client.ClientOptions; +import java.util.Collections; import org.junit.Test; public class ClientOptionsImplTest { @@ -66,6 +67,9 @@ public void shouldImplementHashCodeAndEquals() { .addEqualityGroup( ClientOptions.create().setHttp2MultiplexingLimit(5) ) + .addEqualityGroup( + ClientOptions.create().setRequestHeaders(Collections.singletonMap("h1", "v1")) + ) .testEquals(); } From 9176f46b6aec55a61d658d78555c752c9af3e750 Mon Sep 17 00:00:00 2001 From: Victoria Xia Date: Thu, 17 Feb 2022 10:25:23 -0800 Subject: [PATCH 2/6] feat: add custom header support when applying migrations --- .../tools/migrations/MigrationConfig.java | 3 +- .../commands/ApplyMigrationCommand.java | 20 ++++++--- .../tools/migrations/util/MigrationsUtil.java | 37 ++++++++++++++-- .../commands/ApplyMigrationCommandTest.java | 44 +++++++++---------- .../migrations/util/MigrationsUtilTest.java | 10 ++++- 5 files changed, 80 insertions(+), 34 deletions(-) diff --git a/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/MigrationConfig.java b/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/MigrationConfig.java index f16036d20fd7..9d3dae909513 100644 --- a/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/MigrationConfig.java +++ b/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/MigrationConfig.java @@ -187,7 +187,8 @@ private static String getServiceId(final Map configs) throws Mig configs.get(SSL_KEY_PASSWORD), configs.get(SSL_KEY_ALIAS), configs.getOrDefault(SSL_ALPN, "false").equalsIgnoreCase("true"), - configs.getOrDefault(SSL_VERIFY_HOST, "true").equalsIgnoreCase("true") + configs.getOrDefault(SSL_VERIFY_HOST, "true").equalsIgnoreCase("true"), + null ); final String serviceId; try { diff --git a/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/ApplyMigrationCommand.java b/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/ApplyMigrationCommand.java index 0a3b19c13c67..e9707582448c 100644 --- a/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/ApplyMigrationCommand.java +++ b/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/ApplyMigrationCommand.java @@ -56,7 +56,7 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.ExecutionException; -import java.util.function.Function; +import java.util.function.BiFunction; import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -131,6 +131,14 @@ public class ApplyMigrationCommand extends BaseCommand { ) private List definedVars = null; + @Option( + name = {"--headers"}, + description = "Path to custom request headers file. These headers will be sent with all " + + "requests to the ksqlDB server as part of applying these migrations." + ) + @Once + private String headersFile; + @Override protected int command() { if (!validateConfigFilePresent()) { @@ -157,14 +165,14 @@ protected int command() { @VisibleForTesting int command( final MigrationConfig config, - final Function clientSupplier, + final BiFunction clientSupplier, final String migrationsDir, final Clock clock ) { // CHECKSTYLE_RULES.ON: NPathComplexity final Client ksqlClient; try { - ksqlClient = clientSupplier.apply(config); + ksqlClient = clientSupplier.apply(config, headersFile); } catch (MigrationException e) { LOGGER.error(e.getMessage()); return 1; @@ -330,9 +338,11 @@ private void executeCommands( final List commands = CommandParser.splitSql(migrationFileContent); executeCommands( - commands, ksqlClient, config, executionStart, migration, clock, previous, true); + commands, ksqlClient, config, executionStart, + migration, clock, previous, true); executeCommands( - commands, ksqlClient, config, executionStart, migration, clock, previous, false); + commands, ksqlClient, config, executionStart, + migration, clock, previous, false); } /** diff --git a/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/util/MigrationsUtil.java b/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/util/MigrationsUtil.java index f610c2364131..44d44e393a4d 100644 --- a/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/util/MigrationsUtil.java +++ b/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/util/MigrationsUtil.java @@ -18,10 +18,14 @@ import com.google.common.annotations.VisibleForTesting; import io.confluent.ksql.api.client.Client; import io.confluent.ksql.api.client.ClientOptions; +import io.confluent.ksql.properties.PropertiesUtil; import io.confluent.ksql.tools.migrations.MigrationConfig; import io.confluent.ksql.tools.migrations.MigrationException; +import io.confluent.ksql.util.KsqlException; +import java.io.File; import java.net.MalformedURLException; import java.net.URL; +import java.util.Map; public final class MigrationsUtil { @@ -31,6 +35,11 @@ private MigrationsUtil() { public static final String MIGRATIONS_COMMAND = "ksql-migrations"; public static Client getKsqlClient(final MigrationConfig config) throws MigrationException { + return getKsqlClient(config, null); + } + + public static Client getKsqlClient(final MigrationConfig config, final String headersFile) + throws MigrationException { return getKsqlClient( config.getString(MigrationConfig.KSQL_SERVER_URL), config.getString(MigrationConfig.KSQL_BASIC_AUTH_USERNAME), @@ -42,7 +51,8 @@ public static Client getKsqlClient(final MigrationConfig config) throws Migratio config.getString(MigrationConfig.SSL_KEY_PASSWORD), config.getString(MigrationConfig.SSL_KEY_ALIAS), config.getBoolean(MigrationConfig.SSL_ALPN), - config.getBoolean(MigrationConfig.SSL_VERIFY_HOST) + config.getBoolean(MigrationConfig.SSL_VERIFY_HOST), + loadRequestHeaders(headersFile) ); } @@ -59,11 +69,12 @@ public static Client getKsqlClient( final String sslKeyPassword, final String sslKeyAlias, final boolean sslAlpn, - final boolean sslVerifyHost + final boolean sslVerifyHost, + final Map requestHeaders ) { return Client.create(createClientOptions(ksqlServerUrl, username, password, sslTrustStoreLocation, sslTrustStorePassword, sslKeystoreLocation, sslKeystorePassword, - sslKeyPassword, sslKeyAlias, sslAlpn, sslVerifyHost)); + sslKeyPassword, sslKeyAlias, sslAlpn, sslVerifyHost, requestHeaders)); } @VisibleForTesting @@ -80,7 +91,8 @@ static ClientOptions createClientOptions( final String sslKeyPassword, final String sslKeyAlias, final boolean useAlpn, - final boolean verifyHost + final boolean verifyHost, + final Map requestHeaders ) { final URL url; try { @@ -111,6 +123,23 @@ static ClientOptions createClientOptions( options.setUseAlpn(useAlpn); options.setVerifyHost(verifyHost); } + + if (requestHeaders != null) { + options.setRequestHeaders(requestHeaders); + } + return options; } + + private static Map loadRequestHeaders(final String headersFile) { + if (headersFile == null || headersFile.trim().isEmpty()) { + return null; + } + + try { + return PropertiesUtil.loadProperties(new File(headersFile.trim())); + } catch (KsqlException e) { + throw new MigrationException("Could not parse headers file '" + headersFile + "'."); + } + } } diff --git a/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/commands/ApplyMigrationCommandTest.java b/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/commands/ApplyMigrationCommandTest.java index fcb72fa46ddc..ca11647de4f5 100644 --- a/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/commands/ApplyMigrationCommandTest.java +++ b/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/commands/ApplyMigrationCommandTest.java @@ -171,7 +171,7 @@ public void shouldApplyFirstMigration() throws Exception { when(versionQueryResult.get()).thenReturn(ImmutableList.of()); // When: - final int result = command.command(config, cfg -> ksqlClient, migrationsDir, Clock.fixed( + final int result = command.command(config, (cfg, headers) -> ksqlClient, migrationsDir, Clock.fixed( Instant.ofEpochMilli(1000), ZoneId.systemDefault())); // Then: @@ -192,7 +192,7 @@ public void shouldApplySetUnsetCommands() throws Exception { when(versionQueryResult.get()).thenReturn(ImmutableList.of()); // When: - final int result = command.command(config, cfg -> ksqlClient, migrationsDir, Clock.fixed( + final int result = command.command(config, (cfg, headers) -> ksqlClient, migrationsDir, Clock.fixed( Instant.ofEpochMilli(1000), ZoneId.systemDefault())); // Then: @@ -225,7 +225,7 @@ public void shouldApplyDefineUndefineCommands() throws Exception { ); // When: - final int result = command.command(config, cfg -> ksqlClient, migrationsDir, Clock.fixed( + final int result = command.command(config, (cfg, headers) -> ksqlClient, migrationsDir, Clock.fixed( Instant.ofEpochMilli(1000), ZoneId.systemDefault())); // Then: @@ -260,7 +260,7 @@ public void shouldResetVariablesBetweenMigrations() throws Exception { givenAppliedMigration(1, NAME, MigrationState.MIGRATED); // When: - final int result = command.command(config, cfg -> ksqlClient, migrationsDir, Clock.fixed( + final int result = command.command(config, (cfg, headers) -> ksqlClient, migrationsDir, Clock.fixed( Instant.ofEpochMilli(1000), ZoneId.systemDefault())); // Then: @@ -289,7 +289,7 @@ public void shouldApplyArgumentVariablesEveryMigration() throws Exception { givenAppliedMigration(1, NAME, MigrationState.MIGRATED); // When: - final int result = command.command(config, cfg -> ksqlClient, migrationsDir, Clock.fixed( + final int result = command.command(config, (cfg, headers) -> ksqlClient, migrationsDir, Clock.fixed( Instant.ofEpochMilli(1000), ZoneId.systemDefault())); // Then: @@ -313,7 +313,7 @@ public void defineStatementsShouldTakePrecedenceOverArgumentVariables() throws E givenAppliedMigration(1, NAME, MigrationState.MIGRATED); // When: - final int result = command.command(config, cfg -> ksqlClient, migrationsDir, Clock.fixed( + final int result = command.command(config, (cfg, headers) -> ksqlClient, migrationsDir, Clock.fixed( Instant.ofEpochMilli(1000), ZoneId.systemDefault())); // Then: @@ -333,7 +333,7 @@ public void shouldFailOnInvalidArgumentVariable() throws Exception { when(versionQueryResult.get()).thenReturn(ImmutableList.of()); // When: - final int result = command.command(config, cfg -> ksqlClient, migrationsDir, Clock.fixed( + final int result = command.command(config, (cfg, headers) -> ksqlClient, migrationsDir, Clock.fixed( Instant.ofEpochMilli(1000), ZoneId.systemDefault())); // Then: @@ -350,7 +350,7 @@ public void shouldResetPropertiesBetweenMigrations() throws Exception { givenAppliedMigration(1, NAME, MigrationState.MIGRATED); // When: - final int result = command.command(config, cfg -> ksqlClient, migrationsDir, Clock.fixed( + final int result = command.command(config, (cfg, headers) -> ksqlClient, migrationsDir, Clock.fixed( Instant.ofEpochMilli(1000), ZoneId.systemDefault())); // Then: @@ -371,7 +371,7 @@ public void shouldApplySecondMigration() throws Exception { givenAppliedMigration(1, NAME, MigrationState.MIGRATED); // When: - final int result = command.command(config, cfg -> ksqlClient, migrationsDir, Clock.fixed( + final int result = command.command(config, (cfg, headers) -> ksqlClient, migrationsDir, Clock.fixed( Instant.ofEpochMilli(1000), ZoneId.systemDefault())); // Then: @@ -392,7 +392,7 @@ public void shouldApplyMultipleMigrations() throws Exception { givenAppliedMigration(1, NAME, MigrationState.MIGRATED); // When: - final int result = command.command(config, cfg -> ksqlClient, migrationsDir, Clock.fixed( + final int result = command.command(config, (cfg, headers) -> ksqlClient, migrationsDir, Clock.fixed( Instant.ofEpochMilli(1000), ZoneId.systemDefault())); // Then: @@ -416,7 +416,7 @@ public void shouldApplyUntilVersion() throws Exception { givenAppliedMigration(1, NAME, MigrationState.MIGRATED); // When: - final int result = command.command(config, cfg -> ksqlClient, migrationsDir, Clock.fixed( + final int result = command.command(config, (cfg, headers) -> ksqlClient, migrationsDir, Clock.fixed( Instant.ofEpochMilli(1000), ZoneId.systemDefault())); // Then: @@ -438,7 +438,7 @@ public void shouldApplySpecificMigration() throws Exception { givenAppliedMigration(1, NAME, MigrationState.MIGRATED); // When: - final int result = command.command(config, cfg -> ksqlClient, migrationsDir, Clock.fixed( + final int result = command.command(config, (cfg, headers) -> ksqlClient, migrationsDir, Clock.fixed( Instant.ofEpochMilli(1000), ZoneId.systemDefault())); // Then: @@ -459,7 +459,7 @@ public void shouldNotApplyMigrationIfPreviousNotFinished() throws Exception { givenAppliedMigration(1, NAME, MigrationState.RUNNING); // When: - final int result = command.command(config, cfg -> ksqlClient, migrationsDir, Clock.fixed( + final int result = command.command(config, (cfg, headers) -> ksqlClient, migrationsDir, Clock.fixed( Instant.ofEpochMilli(1000), ZoneId.systemDefault())); // Then: @@ -479,7 +479,7 @@ public void shouldLogErrorStateIfMigrationFails() throws Exception { when(statementResultCf.get()).thenThrow(new ExecutionException("sql rejected", new RuntimeException())); // When: - final int result = command.command(config, cfg -> ksqlClient, migrationsDir, Clock.fixed( + final int result = command.command(config, (cfg, headers) -> ksqlClient, migrationsDir, Clock.fixed( Instant.ofEpochMilli(1000), ZoneId.systemDefault())); // Then: @@ -502,7 +502,7 @@ public void shouldSkipApplyIfValidateFails() throws Exception { givenAppliedMigration(1, NAME, MigrationState.MIGRATED); // When: - final int result = command.command(config, cfg -> ksqlClient, migrationsDir, Clock.fixed( + final int result = command.command(config, (cfg, headers) -> ksqlClient, migrationsDir, Clock.fixed( Instant.ofEpochMilli(1000), ZoneId.systemDefault())); // Then: @@ -523,7 +523,7 @@ public void shouldNotFailIfFileDoesntFitFormat() throws Exception { assertThat(new File(migrationsDir + "/foo.sql").createNewFile(), is(true)); // When: - final int result = command.command(config, cfg -> ksqlClient, migrationsDir, Clock.fixed( + final int result = command.command(config, (cfg, headers) -> ksqlClient, migrationsDir, Clock.fixed( Instant.ofEpochMilli(1000), ZoneId.systemDefault())); // Then: @@ -544,7 +544,7 @@ public void shouldFailIfMetadataNotInitialized() throws Exception { .thenThrow(new ExecutionException("Source not found", new RuntimeException())); // When: - final int result = command.command(config, cfg -> ksqlClient, migrationsDir, Clock.fixed( + final int result = command.command(config, (cfg, headers) -> ksqlClient, migrationsDir, Clock.fixed( Instant.ofEpochMilli(1000), ZoneId.systemDefault())); // Then: @@ -561,7 +561,7 @@ public void shouldThrowErrorOnParsingFailure() throws Exception { when(versionQueryResult.get()).thenReturn(ImmutableList.of()); // When: - final int result = command.command(config, cfg -> ksqlClient, migrationsDir, Clock.fixed( + final int result = command.command(config, (cfg, headers) -> ksqlClient, migrationsDir, Clock.fixed( Instant.ofEpochMilli(1000), ZoneId.systemDefault())); // Then: @@ -582,7 +582,7 @@ public void shouldApplyInsertStatement() throws Exception { givenAppliedMigration(1, NAME, MigrationState.MIGRATED); // When: - final int result = command.command(config, cfg -> ksqlClient, migrationsDir, Clock.fixed( + final int result = command.command(config, (cfg, headers) -> ksqlClient, migrationsDir, Clock.fixed( Instant.ofEpochMilli(1000), ZoneId.systemDefault())); // Then: @@ -608,7 +608,7 @@ public void shouldApplyCreateConnectorStatement() throws Exception { givenAppliedMigration(1, NAME, MigrationState.MIGRATED); // When: - final int result = command.command(config, cfg -> ksqlClient, migrationsDir, Clock.fixed( + final int result = command.command(config, (cfg, headers) -> ksqlClient, migrationsDir, Clock.fixed( Instant.ofEpochMilli(1000), ZoneId.systemDefault())); // Then: @@ -630,7 +630,7 @@ public void shouldApplyDropConnectorStatement() throws Exception { givenAppliedMigration(1, NAME, MigrationState.MIGRATED); // When: - final int result = command.command(config, cfg -> ksqlClient, migrationsDir, Clock.fixed( + final int result = command.command(config, (cfg, headers) -> ksqlClient, migrationsDir, Clock.fixed( Instant.ofEpochMilli(1000), ZoneId.systemDefault())); // Then: @@ -651,7 +651,7 @@ public void shouldNotApplyOlderVersion() throws Exception { givenCurrentMigrationVersion("1"); // When: - final int result = command.command(config, cfg -> ksqlClient, migrationsDir, Clock.fixed( + final int result = command.command(config, (cfg, headers) -> ksqlClient, migrationsDir, Clock.fixed( Instant.ofEpochMilli(1000), ZoneId.systemDefault())); // Then: diff --git a/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/util/MigrationsUtilTest.java b/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/util/MigrationsUtilTest.java index 3af660e7ea6e..cff0ddb4d1a8 100644 --- a/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/util/MigrationsUtilTest.java +++ b/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/util/MigrationsUtilTest.java @@ -18,8 +18,11 @@ import static io.confluent.ksql.tools.migrations.util.MigrationsUtil.createClientOptions; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; +import com.google.common.collect.ImmutableMap; import io.confluent.ksql.api.client.ClientOptions; +import java.util.Map; import org.junit.Test; public class MigrationsUtilTest { @@ -32,7 +35,7 @@ public void shouldCreateNonTlsClientOptions() { // Given: final ClientOptions clientOptions = createClientOptions(NON_TLS_URL, "user", "pass", null, "", null, - null, "", "foo", false, true); + null, "", "foo", false, true, null); // Then: assertThat(clientOptions.isUseTls(), is(false)); @@ -46,14 +49,16 @@ public void shouldCreateNonTlsClientOptions() { assertThat(clientOptions.getKeyAlias(), is("")); assertThat(clientOptions.isUseAlpn(), is(false)); assertThat(clientOptions.isVerifyHost(), is(true)); + assertThat(clientOptions.getRequestHeaders(), nullValue()); } @Test public void shouldCreateTlsClientOptions() { // Given: + final Map requestHeaders = ImmutableMap.of("h1", "v1", "h2", "v2"); final ClientOptions clientOptions = createClientOptions(TLS_URL, "user", "pass", "abc", null, null, - null, null, null, true, true); + null, null, null, true, true, requestHeaders); // Then: assertThat(clientOptions.isUseTls(), is(true)); @@ -67,5 +72,6 @@ public void shouldCreateTlsClientOptions() { assertThat(clientOptions.getKeyAlias(), is("")); assertThat(clientOptions.isUseAlpn(), is(true)); assertThat(clientOptions.isVerifyHost(), is(true)); + assertThat(clientOptions.getRequestHeaders(), is(requestHeaders)); } } From 978c28b2db7791eb00289ca3e47a5792771659d4 Mon Sep 17 00:00:00 2001 From: Victoria Xia Date: Thu, 17 Feb 2022 11:53:09 -0800 Subject: [PATCH 3/6] docs: add docs for custom headers in migrations tool --- docs/operate-and-deploy/migrations-tool.md | 24 +++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/docs/operate-and-deploy/migrations-tool.md b/docs/operate-and-deploy/migrations-tool.md index 35f9c45d326f..7bbbf15867e0 100644 --- a/docs/operate-and-deploy/migrations-tool.md +++ b/docs/operate-and-deploy/migrations-tool.md @@ -226,6 +226,7 @@ ksql-migrations {-c | --config-file} apply [ {-u | --until} ] [ {-v | --version} ] [ {-d | --define} = ] + [ --headers ] [ --dry-run ] ``` @@ -241,7 +242,8 @@ to apply: In addition to selecting a mode for `ksql-migrations apply`, you must also provide the path to the config file of your migrations project as part of the command. -If both your ksqlDB server and migration tool are version 0.18 and newer, you can define variables by passing the `--define` flag followed by a string of the form +If both your ksqlDB server and migration tool are version 0.18 and newer, you can +define variables by passing the `--define` flag followed by a string of the form `name=value` any number of times. For example, the following command ```bash @@ -307,6 +309,26 @@ containing multiple ksqlDB statements fails during the migration, it's possible some of the statements will have been run on the ksqlDB server while later statements have not. +You can optionally pass custom request headers to be sent with all ksqlDB requests +made as part of the `apply` command. In order to do so, pass the location of a +file containing the custom request headers with the `--headers` flag: + +``` +$ ksql-migrations --config-file /my/migrations/project/ksql-migrations.properties apply --next --headers /my/migrations/project/request_headers.txt +``` + +Format your headers file with one header name and value pair on each line, +separated either with a colon or an equals sign. Both of the following are valid: +``` +X-My-Custom-Header: abcdefg +X-My-Other-Custom-Header: asdfgh +``` +or +``` +X-My-Custom-Header=abcdefg +X-My-Other-Custom-Header=asdfgh +``` + View Current Migration Status ----------------------------- From 3628298788b6aaa6c109805128059b52502633fe Mon Sep 17 00:00:00 2001 From: Victoria Xia Date: Thu, 17 Feb 2022 12:35:17 -0800 Subject: [PATCH 4/6] docs: feedback --- docs/operate-and-deploy/migrations-tool.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/operate-and-deploy/migrations-tool.md b/docs/operate-and-deploy/migrations-tool.md index 7bbbf15867e0..911754769638 100644 --- a/docs/operate-and-deploy/migrations-tool.md +++ b/docs/operate-and-deploy/migrations-tool.md @@ -310,7 +310,7 @@ some of the statements will have been run on the ksqlDB server while later state have not. You can optionally pass custom request headers to be sent with all ksqlDB requests -made as part of the `apply` command. In order to do so, pass the location of a +made as part of the `apply` command by passing the location of a file containing the custom request headers with the `--headers` flag: ``` From d2f9d42c079adb00e2716fcedb610ba4a696f7a1 Mon Sep 17 00:00:00 2001 From: Victoria Xia Date: Thu, 17 Feb 2022 15:24:50 -0800 Subject: [PATCH 5/6] chore: update to empty map --- .../java/io/confluent/ksql/api/client/ClientOptions.java | 2 +- .../io/confluent/ksql/api/client/impl/ClientOptionsImpl.java | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/ClientOptions.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/ClientOptions.java index 31df5d854148..9dce21f4a9e1 100644 --- a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/ClientOptions.java +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/ClientOptions.java @@ -275,7 +275,7 @@ public interface ClientOptions { /** * Returns a copy of the custom request headers to be sent with ksqlDB requests. - * If not set, then this method returns {@code null}. + * If not set, then this method returns an empty map. * * @return custom request headers */ diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientOptionsImpl.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientOptionsImpl.java index 49380292418b..77cb1e3f56b6 100644 --- a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientOptionsImpl.java +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientOptionsImpl.java @@ -15,6 +15,7 @@ package io.confluent.ksql.api.client.impl; +import com.google.common.collect.ImmutableMap; import io.confluent.ksql.api.client.ClientOptions; import java.util.HashMap; import java.util.Map; @@ -165,7 +166,7 @@ public ClientOptions setHttp2MultiplexingLimit(final int http2MultiplexingLimit) @Override public ClientOptions setRequestHeaders(final Map requestHeaders) { - this.requestHeaders = requestHeaders; + this.requestHeaders = ImmutableMap.copyOf(requestHeaders); return this; } @@ -251,7 +252,7 @@ public int getHttp2MultiplexingLimit() { @Override public Map getRequestHeaders() { - return requestHeaders == null ? null : new HashMap<>(requestHeaders); + return requestHeaders == null ? new HashMap<>() : new HashMap<>(requestHeaders); } @Override From 81e8549ab44b2ff135764802df43c9ed5c976d20 Mon Sep 17 00:00:00 2001 From: Victoria Xia Date: Thu, 17 Feb 2022 15:39:26 -0800 Subject: [PATCH 6/6] chore: fixes --- .../io/confluent/ksql/api/client/impl/ClientOptionsImpl.java | 2 +- .../ksql/tools/migrations/util/MigrationsUtilTest.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientOptionsImpl.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientOptionsImpl.java index 77cb1e3f56b6..e8d8b7506d7c 100644 --- a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientOptionsImpl.java +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientOptionsImpl.java @@ -166,7 +166,7 @@ public ClientOptions setHttp2MultiplexingLimit(final int http2MultiplexingLimit) @Override public ClientOptions setRequestHeaders(final Map requestHeaders) { - this.requestHeaders = ImmutableMap.copyOf(requestHeaders); + this.requestHeaders = requestHeaders == null ? null : ImmutableMap.copyOf(requestHeaders); return this; } diff --git a/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/util/MigrationsUtilTest.java b/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/util/MigrationsUtilTest.java index cff0ddb4d1a8..099935b849f7 100644 --- a/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/util/MigrationsUtilTest.java +++ b/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/util/MigrationsUtilTest.java @@ -18,10 +18,10 @@ import static io.confluent.ksql.tools.migrations.util.MigrationsUtil.createClientOptions; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.nullValue; import com.google.common.collect.ImmutableMap; import io.confluent.ksql.api.client.ClientOptions; +import java.util.Collections; import java.util.Map; import org.junit.Test; @@ -49,7 +49,7 @@ public void shouldCreateNonTlsClientOptions() { assertThat(clientOptions.getKeyAlias(), is("")); assertThat(clientOptions.isUseAlpn(), is(false)); assertThat(clientOptions.isVerifyHost(), is(true)); - assertThat(clientOptions.getRequestHeaders(), nullValue()); + assertThat(clientOptions.getRequestHeaders(), is(Collections.emptyMap())); } @Test