diff --git a/docs/operate-and-deploy/migrations-tool.md b/docs/operate-and-deploy/migrations-tool.md index 35f9c45d326f..911754769638 100644 --- a/docs/operate-and-deploy/migrations-tool.md +++ b/docs/operate-and-deploy/migrations-tool.md @@ -226,6 +226,7 @@ ksql-migrations {-c | --config-file} apply [ {-u | --until} ] [ {-v | --version} ] [ {-d | --define} = ] + [ --headers ] [ --dry-run ] ``` @@ -241,7 +242,8 @@ to apply: In addition to selecting a mode for `ksql-migrations apply`, you must also provide the path to the config file of your migrations project as part of the command. -If both your ksqlDB server and migration tool are version 0.18 and newer, you can define variables by passing the `--define` flag followed by a string of the form +If both your ksqlDB server and migration tool are version 0.18 and newer, you can +define variables by passing the `--define` flag followed by a string of the form `name=value` any number of times. For example, the following command ```bash @@ -307,6 +309,26 @@ containing multiple ksqlDB statements fails during the migration, it's possible some of the statements will have been run on the ksqlDB server while later statements have not. +You can optionally pass custom request headers to be sent with all ksqlDB requests +made as part of the `apply` command by passing the location of a +file containing the custom request headers with the `--headers` flag: + +``` +$ ksql-migrations --config-file /my/migrations/project/ksql-migrations.properties apply --next --headers /my/migrations/project/request_headers.txt +``` + +Format your headers file with one header name and value pair on each line, +separated either with a colon or an equals sign. Both of the following are valid: +``` +X-My-Custom-Header: abcdefg +X-My-Other-Custom-Header: asdfgh +``` +or +``` +X-My-Custom-Header=abcdefg +X-My-Other-Custom-Header=asdfgh +``` + View Current Migration Status ----------------------------- diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/ClientOptions.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/ClientOptions.java index 59632a90bde5..9dce21f4a9e1 100644 --- a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/ClientOptions.java +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/ClientOptions.java @@ -16,6 +16,7 @@ package io.confluent.ksql.api.client; import io.confluent.ksql.api.client.impl.ClientOptionsImpl; +import java.util.Map; /** * Options for the ksqlDB {@link Client}. @@ -143,6 +144,23 @@ public interface ClientOptions { */ ClientOptions setHttp2MultiplexingLimit(int http2MultiplexingLimit); + /** + * Sets custom request headers to be sent with requests to the ksqlDB server. + * These headers are in addition to any automatic headers such as the + * authorization header. + * + *

If this method is called more than once, only the headers passed on + * the last invocation will be used. To update existing custom headers, + * use this method in combination with {@link ClientOptions#getRequestHeaders()}. + * + *

In case of overlap between these custom headers and automatic headers such + * as the authorization header, these custom headers take precedence. + * + * @param requestHeaders custom request headers + * @return a reference to this + */ + ClientOptions setRequestHeaders(Map requestHeaders); + /** * Returns the host name of the ksqlDB server to connect to. * @@ -255,6 +273,14 @@ public interface ClientOptions { */ int getHttp2MultiplexingLimit(); + /** + * Returns a copy of the custom request headers to be sent with ksqlDB requests. + * If not set, then this method returns an empty map. + * + * @return custom request headers + */ + Map getRequestHeaders(); + /** * Creates a copy of these {@code ClientOptions}. * diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientImpl.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientImpl.java index 022f8184abdd..ae62cbcb921f 100644 --- a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientImpl.java +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientImpl.java @@ -60,6 +60,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -556,6 +557,11 @@ private > void makeRequest( if (clientOptions.isUseBasicAuth()) { request = configureBasicAuth(request); } + if (clientOptions.getRequestHeaders() != null) { + for (final Entry entry : clientOptions.getRequestHeaders().entrySet()) { + request.putHeader(entry.getKey(), entry.getValue()); + } + } if (endRequest) { request.end(requestBody); } else { diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientOptionsImpl.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientOptionsImpl.java index 7138ed483f33..e8d8b7506d7c 100644 --- a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientOptionsImpl.java +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientOptionsImpl.java @@ -15,7 +15,10 @@ package io.confluent.ksql.api.client.impl; +import com.google.common.collect.ImmutableMap; import io.confluent.ksql.api.client.ClientOptions; +import java.util.HashMap; +import java.util.Map; import java.util.Objects; public class ClientOptionsImpl implements ClientOptions { @@ -36,6 +39,7 @@ public class ClientOptionsImpl implements ClientOptions { private String basicAuthPassword; private int executeQueryMaxResultRows = ClientOptions.DEFAULT_EXECUTE_QUERY_MAX_RESULT_ROWS; private int http2MultiplexingLimit = ClientOptions.DEFAULT_HTTP2_MULTIPLEXING_LIMIT; + private Map requestHeaders; /** * {@code ClientOptions} should be instantiated via {@link ClientOptions#create}, NOT via this @@ -53,7 +57,8 @@ private ClientOptionsImpl( final String trustStorePath, final String trustStorePassword, final String keyStorePath, final String keyStorePassword, final String keyPassword, final String keyAlias, final String basicAuthUsername, final String basicAuthPassword, - final int executeQueryMaxResultRows, final int http2MultiplexingLimit) { + final int executeQueryMaxResultRows, final int http2MultiplexingLimit, + final Map requestHeaders) { this.host = Objects.requireNonNull(host); this.port = port; this.useTls = useTls; @@ -70,6 +75,7 @@ private ClientOptionsImpl( this.basicAuthPassword = basicAuthPassword; this.executeQueryMaxResultRows = executeQueryMaxResultRows; this.http2MultiplexingLimit = http2MultiplexingLimit; + this.requestHeaders = requestHeaders; } @Override @@ -158,6 +164,12 @@ public ClientOptions setHttp2MultiplexingLimit(final int http2MultiplexingLimit) return this; } + @Override + public ClientOptions setRequestHeaders(final Map requestHeaders) { + this.requestHeaders = requestHeaders == null ? null : ImmutableMap.copyOf(requestHeaders); + return this; + } + @Override public String getHost() { return host == null ? "" : host; @@ -238,6 +250,11 @@ public int getHttp2MultiplexingLimit() { return http2MultiplexingLimit; } + @Override + public Map getRequestHeaders() { + return requestHeaders == null ? new HashMap<>() : new HashMap<>(requestHeaders); + } + @Override public ClientOptions copy() { return new ClientOptionsImpl( @@ -247,7 +264,8 @@ public ClientOptions copy() { trustStorePath, trustStorePassword, keyStorePath, keyStorePassword, keyPassword, keyAlias, basicAuthUsername, basicAuthPassword, - executeQueryMaxResultRows, http2MultiplexingLimit); + executeQueryMaxResultRows, http2MultiplexingLimit, + requestHeaders); } // CHECKSTYLE_RULES.OFF: CyclomaticComplexity @@ -275,14 +293,16 @@ public boolean equals(final Object o) { && Objects.equals(keyAlias, that.keyAlias) && Objects.equals(basicAuthUsername, that.basicAuthUsername) && Objects.equals(basicAuthPassword, that.basicAuthPassword) - && http2MultiplexingLimit == that.http2MultiplexingLimit; + && http2MultiplexingLimit == that.http2MultiplexingLimit + && Objects.equals(requestHeaders, that.requestHeaders); } @Override public int hashCode() { return Objects.hash(host, port, useTls, verifyHost, useAlpn, trustStorePath, trustStorePassword, keyStorePath, keyStorePassword, keyPassword, keyAlias, - basicAuthUsername, basicAuthPassword, executeQueryMaxResultRows, http2MultiplexingLimit); + basicAuthUsername, basicAuthPassword, executeQueryMaxResultRows, http2MultiplexingLimit, + requestHeaders); } @Override @@ -301,8 +321,9 @@ public String toString() { + ", keyAlias='" + keyAlias + '\'' + ", basicAuthUsername='" + basicAuthUsername + '\'' + ", basicAuthPassword='" + basicAuthPassword + '\'' - + ", executeQueryMaxResultRows=" + executeQueryMaxResultRows + '\'' + + ", executeQueryMaxResultRows=" + executeQueryMaxResultRows + ", http2MultiplexingLimit=" + http2MultiplexingLimit + + ", requestHeaders='" + requestHeaders + '\'' + '}'; } } diff --git a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientBasicAuthTest.java b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientBasicAuthTest.java index 7670830d20c4..d6981c852c79 100644 --- a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientBasicAuthTest.java +++ b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientBasicAuthTest.java @@ -57,9 +57,7 @@ protected KsqlRestConfig createServerConfig() { @Override protected ClientOptions createJavaClientOptions() { - return ClientOptions.create() - .setHost("localhost") - .setPort(server.getListeners().get(0).getPort()) + return super.createJavaClientOptions() .setBasicAuthCredentials(USER_WITH_ACCESS, USER_WITH_ACCESS_PWD); } diff --git a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTest.java b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTest.java index 8545619eaed3..2862e68ebcd6 100644 --- a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTest.java +++ b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTest.java @@ -43,7 +43,6 @@ import io.confluent.ksql.api.client.exception.KsqlClientException; import io.confluent.ksql.api.client.exception.KsqlException; import io.confluent.ksql.api.client.impl.ConnectorTypeImpl; -import io.confluent.ksql.api.client.impl.ExecuteStatementResultImpl; import io.confluent.ksql.api.client.impl.StreamedQueryResultImpl; import io.confluent.ksql.api.client.util.ClientTestUtil; import io.confluent.ksql.api.client.util.ClientTestUtil.TestSubscriber; @@ -100,6 +99,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Optional; import java.util.Set; import java.util.concurrent.CountDownLatch; @@ -108,6 +108,9 @@ import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo.ConnectorState; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.TypeSafeDiagnosingMatcher; import org.junit.Test; import org.reactivestreams.Publisher; import org.slf4j.Logger; @@ -139,6 +142,8 @@ public class ClientTest extends BaseApiTest { protected static final org.apache.kafka.connect.runtime.rest.entities.ConnectorType SOURCE_TYPE = org.apache.kafka.connect.runtime.rest.entities.ConnectorType.SOURCE; + protected static final Map REQUEST_HEADERS = ImmutableMap.of("h1", "v1", "h2", "v2"); + protected Client javaClient; @Override @@ -1761,6 +1766,31 @@ public void setSessionVariablesWithHttpRequest() throws Exception { assertThat(testEndpoints.getLastProperties(), is(new JsonObject().put("auto.offset.reset", "earliest"))); } + @Test + public void shouldSendCustomRequestHeaders() throws Exception { + // Given: + final CommandStatusEntity entity = new CommandStatusEntity( + "CSAS;", + new CommandId("STREAM", "FOO", "CREATE"), + new CommandStatus( + CommandStatus.Status.SUCCESS, + "Success" + ), + 0L + ); + testEndpoints.setKsqlEndpointResponse(Collections.singletonList(entity)); + + // When: + javaClient.executeStatement("CSAS;").get(); + + // Then: + final List> requestHeaders = + testEndpoints.getLastApiSecurityContext().getRequestHeaders(); + for (final Entry header : REQUEST_HEADERS.entrySet()) { + assertThat(requestHeaders, hasItems(entry(header))); + } + } + protected Client createJavaClient() { return Client.create(createJavaClientOptions(), vertx); } @@ -1768,7 +1798,8 @@ protected Client createJavaClient() { protected ClientOptions createJavaClientOptions() { return ClientOptions.create() .setHost("localhost") - .setPort(server.getListeners().get(0).getPort()); + .setPort(server.getListeners().get(0).getPort()) + .setRequestHeaders(REQUEST_HEADERS); } private void verifyPushQueryServerState(final String sql) { @@ -2069,4 +2100,30 @@ public LegacySourceDescriptionEntity( this.warnings = warnings; } } + + private static Matcher> entry( + final Entry entry + ) { + return new TypeSafeDiagnosingMatcher>() { + @Override + protected boolean matchesSafely( + final Entry actual, + final Description mismatchDescription) { + if (!entry.getKey().equals(actual.getKey())) { + return false; + } + if (!entry.getValue().equals(actual.getValue())) { + return false; + } + return true; + } + + @Override + public void describeTo(final Description description) { + description.appendText(String.format( + "key: %s. value: %s", + entry.getKey(), entry.getValue())); + } + }; + } } \ No newline at end of file diff --git a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTlsTest.java b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTlsTest.java index 46e7139acb6f..ec69b0a7d370 100644 --- a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTlsTest.java +++ b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTlsTest.java @@ -59,9 +59,7 @@ protected KsqlRestConfig createServerConfig() { @Override protected ClientOptions createJavaClientOptions() { - return ClientOptions.create() - .setHost("localhost") - .setPort(server.getListeners().get(0).getPort()) + return super.createJavaClientOptions() .setUseTls(true) .setTrustStore(TRUST_STORE_PATH) .setTrustStorePassword(TRUST_STORE_PASSWORD) diff --git a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/impl/ClientOptionsImplTest.java b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/impl/ClientOptionsImplTest.java index 138a99609521..e110d2d5f8c3 100644 --- a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/impl/ClientOptionsImplTest.java +++ b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/impl/ClientOptionsImplTest.java @@ -17,6 +17,7 @@ import com.google.common.testing.EqualsTester; import io.confluent.ksql.api.client.ClientOptions; +import java.util.Collections; import org.junit.Test; public class ClientOptionsImplTest { @@ -66,6 +67,9 @@ public void shouldImplementHashCodeAndEquals() { .addEqualityGroup( ClientOptions.create().setHttp2MultiplexingLimit(5) ) + .addEqualityGroup( + ClientOptions.create().setRequestHeaders(Collections.singletonMap("h1", "v1")) + ) .testEquals(); } diff --git a/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/MigrationConfig.java b/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/MigrationConfig.java index f16036d20fd7..9d3dae909513 100644 --- a/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/MigrationConfig.java +++ b/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/MigrationConfig.java @@ -187,7 +187,8 @@ private static String getServiceId(final Map configs) throws Mig configs.get(SSL_KEY_PASSWORD), configs.get(SSL_KEY_ALIAS), configs.getOrDefault(SSL_ALPN, "false").equalsIgnoreCase("true"), - configs.getOrDefault(SSL_VERIFY_HOST, "true").equalsIgnoreCase("true") + configs.getOrDefault(SSL_VERIFY_HOST, "true").equalsIgnoreCase("true"), + null ); final String serviceId; try { diff --git a/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/ApplyMigrationCommand.java b/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/ApplyMigrationCommand.java index 0a3b19c13c67..e9707582448c 100644 --- a/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/ApplyMigrationCommand.java +++ b/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/ApplyMigrationCommand.java @@ -56,7 +56,7 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.ExecutionException; -import java.util.function.Function; +import java.util.function.BiFunction; import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -131,6 +131,14 @@ public class ApplyMigrationCommand extends BaseCommand { ) private List definedVars = null; + @Option( + name = {"--headers"}, + description = "Path to custom request headers file. These headers will be sent with all " + + "requests to the ksqlDB server as part of applying these migrations." + ) + @Once + private String headersFile; + @Override protected int command() { if (!validateConfigFilePresent()) { @@ -157,14 +165,14 @@ protected int command() { @VisibleForTesting int command( final MigrationConfig config, - final Function clientSupplier, + final BiFunction clientSupplier, final String migrationsDir, final Clock clock ) { // CHECKSTYLE_RULES.ON: NPathComplexity final Client ksqlClient; try { - ksqlClient = clientSupplier.apply(config); + ksqlClient = clientSupplier.apply(config, headersFile); } catch (MigrationException e) { LOGGER.error(e.getMessage()); return 1; @@ -330,9 +338,11 @@ private void executeCommands( final List commands = CommandParser.splitSql(migrationFileContent); executeCommands( - commands, ksqlClient, config, executionStart, migration, clock, previous, true); + commands, ksqlClient, config, executionStart, + migration, clock, previous, true); executeCommands( - commands, ksqlClient, config, executionStart, migration, clock, previous, false); + commands, ksqlClient, config, executionStart, + migration, clock, previous, false); } /** diff --git a/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/util/MigrationsUtil.java b/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/util/MigrationsUtil.java index f610c2364131..44d44e393a4d 100644 --- a/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/util/MigrationsUtil.java +++ b/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/util/MigrationsUtil.java @@ -18,10 +18,14 @@ import com.google.common.annotations.VisibleForTesting; import io.confluent.ksql.api.client.Client; import io.confluent.ksql.api.client.ClientOptions; +import io.confluent.ksql.properties.PropertiesUtil; import io.confluent.ksql.tools.migrations.MigrationConfig; import io.confluent.ksql.tools.migrations.MigrationException; +import io.confluent.ksql.util.KsqlException; +import java.io.File; import java.net.MalformedURLException; import java.net.URL; +import java.util.Map; public final class MigrationsUtil { @@ -31,6 +35,11 @@ private MigrationsUtil() { public static final String MIGRATIONS_COMMAND = "ksql-migrations"; public static Client getKsqlClient(final MigrationConfig config) throws MigrationException { + return getKsqlClient(config, null); + } + + public static Client getKsqlClient(final MigrationConfig config, final String headersFile) + throws MigrationException { return getKsqlClient( config.getString(MigrationConfig.KSQL_SERVER_URL), config.getString(MigrationConfig.KSQL_BASIC_AUTH_USERNAME), @@ -42,7 +51,8 @@ public static Client getKsqlClient(final MigrationConfig config) throws Migratio config.getString(MigrationConfig.SSL_KEY_PASSWORD), config.getString(MigrationConfig.SSL_KEY_ALIAS), config.getBoolean(MigrationConfig.SSL_ALPN), - config.getBoolean(MigrationConfig.SSL_VERIFY_HOST) + config.getBoolean(MigrationConfig.SSL_VERIFY_HOST), + loadRequestHeaders(headersFile) ); } @@ -59,11 +69,12 @@ public static Client getKsqlClient( final String sslKeyPassword, final String sslKeyAlias, final boolean sslAlpn, - final boolean sslVerifyHost + final boolean sslVerifyHost, + final Map requestHeaders ) { return Client.create(createClientOptions(ksqlServerUrl, username, password, sslTrustStoreLocation, sslTrustStorePassword, sslKeystoreLocation, sslKeystorePassword, - sslKeyPassword, sslKeyAlias, sslAlpn, sslVerifyHost)); + sslKeyPassword, sslKeyAlias, sslAlpn, sslVerifyHost, requestHeaders)); } @VisibleForTesting @@ -80,7 +91,8 @@ static ClientOptions createClientOptions( final String sslKeyPassword, final String sslKeyAlias, final boolean useAlpn, - final boolean verifyHost + final boolean verifyHost, + final Map requestHeaders ) { final URL url; try { @@ -111,6 +123,23 @@ static ClientOptions createClientOptions( options.setUseAlpn(useAlpn); options.setVerifyHost(verifyHost); } + + if (requestHeaders != null) { + options.setRequestHeaders(requestHeaders); + } + return options; } + + private static Map loadRequestHeaders(final String headersFile) { + if (headersFile == null || headersFile.trim().isEmpty()) { + return null; + } + + try { + return PropertiesUtil.loadProperties(new File(headersFile.trim())); + } catch (KsqlException e) { + throw new MigrationException("Could not parse headers file '" + headersFile + "'."); + } + } } diff --git a/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/commands/ApplyMigrationCommandTest.java b/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/commands/ApplyMigrationCommandTest.java index fcb72fa46ddc..ca11647de4f5 100644 --- a/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/commands/ApplyMigrationCommandTest.java +++ b/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/commands/ApplyMigrationCommandTest.java @@ -171,7 +171,7 @@ public void shouldApplyFirstMigration() throws Exception { when(versionQueryResult.get()).thenReturn(ImmutableList.of()); // When: - final int result = command.command(config, cfg -> ksqlClient, migrationsDir, Clock.fixed( + final int result = command.command(config, (cfg, headers) -> ksqlClient, migrationsDir, Clock.fixed( Instant.ofEpochMilli(1000), ZoneId.systemDefault())); // Then: @@ -192,7 +192,7 @@ public void shouldApplySetUnsetCommands() throws Exception { when(versionQueryResult.get()).thenReturn(ImmutableList.of()); // When: - final int result = command.command(config, cfg -> ksqlClient, migrationsDir, Clock.fixed( + final int result = command.command(config, (cfg, headers) -> ksqlClient, migrationsDir, Clock.fixed( Instant.ofEpochMilli(1000), ZoneId.systemDefault())); // Then: @@ -225,7 +225,7 @@ public void shouldApplyDefineUndefineCommands() throws Exception { ); // When: - final int result = command.command(config, cfg -> ksqlClient, migrationsDir, Clock.fixed( + final int result = command.command(config, (cfg, headers) -> ksqlClient, migrationsDir, Clock.fixed( Instant.ofEpochMilli(1000), ZoneId.systemDefault())); // Then: @@ -260,7 +260,7 @@ public void shouldResetVariablesBetweenMigrations() throws Exception { givenAppliedMigration(1, NAME, MigrationState.MIGRATED); // When: - final int result = command.command(config, cfg -> ksqlClient, migrationsDir, Clock.fixed( + final int result = command.command(config, (cfg, headers) -> ksqlClient, migrationsDir, Clock.fixed( Instant.ofEpochMilli(1000), ZoneId.systemDefault())); // Then: @@ -289,7 +289,7 @@ public void shouldApplyArgumentVariablesEveryMigration() throws Exception { givenAppliedMigration(1, NAME, MigrationState.MIGRATED); // When: - final int result = command.command(config, cfg -> ksqlClient, migrationsDir, Clock.fixed( + final int result = command.command(config, (cfg, headers) -> ksqlClient, migrationsDir, Clock.fixed( Instant.ofEpochMilli(1000), ZoneId.systemDefault())); // Then: @@ -313,7 +313,7 @@ public void defineStatementsShouldTakePrecedenceOverArgumentVariables() throws E givenAppliedMigration(1, NAME, MigrationState.MIGRATED); // When: - final int result = command.command(config, cfg -> ksqlClient, migrationsDir, Clock.fixed( + final int result = command.command(config, (cfg, headers) -> ksqlClient, migrationsDir, Clock.fixed( Instant.ofEpochMilli(1000), ZoneId.systemDefault())); // Then: @@ -333,7 +333,7 @@ public void shouldFailOnInvalidArgumentVariable() throws Exception { when(versionQueryResult.get()).thenReturn(ImmutableList.of()); // When: - final int result = command.command(config, cfg -> ksqlClient, migrationsDir, Clock.fixed( + final int result = command.command(config, (cfg, headers) -> ksqlClient, migrationsDir, Clock.fixed( Instant.ofEpochMilli(1000), ZoneId.systemDefault())); // Then: @@ -350,7 +350,7 @@ public void shouldResetPropertiesBetweenMigrations() throws Exception { givenAppliedMigration(1, NAME, MigrationState.MIGRATED); // When: - final int result = command.command(config, cfg -> ksqlClient, migrationsDir, Clock.fixed( + final int result = command.command(config, (cfg, headers) -> ksqlClient, migrationsDir, Clock.fixed( Instant.ofEpochMilli(1000), ZoneId.systemDefault())); // Then: @@ -371,7 +371,7 @@ public void shouldApplySecondMigration() throws Exception { givenAppliedMigration(1, NAME, MigrationState.MIGRATED); // When: - final int result = command.command(config, cfg -> ksqlClient, migrationsDir, Clock.fixed( + final int result = command.command(config, (cfg, headers) -> ksqlClient, migrationsDir, Clock.fixed( Instant.ofEpochMilli(1000), ZoneId.systemDefault())); // Then: @@ -392,7 +392,7 @@ public void shouldApplyMultipleMigrations() throws Exception { givenAppliedMigration(1, NAME, MigrationState.MIGRATED); // When: - final int result = command.command(config, cfg -> ksqlClient, migrationsDir, Clock.fixed( + final int result = command.command(config, (cfg, headers) -> ksqlClient, migrationsDir, Clock.fixed( Instant.ofEpochMilli(1000), ZoneId.systemDefault())); // Then: @@ -416,7 +416,7 @@ public void shouldApplyUntilVersion() throws Exception { givenAppliedMigration(1, NAME, MigrationState.MIGRATED); // When: - final int result = command.command(config, cfg -> ksqlClient, migrationsDir, Clock.fixed( + final int result = command.command(config, (cfg, headers) -> ksqlClient, migrationsDir, Clock.fixed( Instant.ofEpochMilli(1000), ZoneId.systemDefault())); // Then: @@ -438,7 +438,7 @@ public void shouldApplySpecificMigration() throws Exception { givenAppliedMigration(1, NAME, MigrationState.MIGRATED); // When: - final int result = command.command(config, cfg -> ksqlClient, migrationsDir, Clock.fixed( + final int result = command.command(config, (cfg, headers) -> ksqlClient, migrationsDir, Clock.fixed( Instant.ofEpochMilli(1000), ZoneId.systemDefault())); // Then: @@ -459,7 +459,7 @@ public void shouldNotApplyMigrationIfPreviousNotFinished() throws Exception { givenAppliedMigration(1, NAME, MigrationState.RUNNING); // When: - final int result = command.command(config, cfg -> ksqlClient, migrationsDir, Clock.fixed( + final int result = command.command(config, (cfg, headers) -> ksqlClient, migrationsDir, Clock.fixed( Instant.ofEpochMilli(1000), ZoneId.systemDefault())); // Then: @@ -479,7 +479,7 @@ public void shouldLogErrorStateIfMigrationFails() throws Exception { when(statementResultCf.get()).thenThrow(new ExecutionException("sql rejected", new RuntimeException())); // When: - final int result = command.command(config, cfg -> ksqlClient, migrationsDir, Clock.fixed( + final int result = command.command(config, (cfg, headers) -> ksqlClient, migrationsDir, Clock.fixed( Instant.ofEpochMilli(1000), ZoneId.systemDefault())); // Then: @@ -502,7 +502,7 @@ public void shouldSkipApplyIfValidateFails() throws Exception { givenAppliedMigration(1, NAME, MigrationState.MIGRATED); // When: - final int result = command.command(config, cfg -> ksqlClient, migrationsDir, Clock.fixed( + final int result = command.command(config, (cfg, headers) -> ksqlClient, migrationsDir, Clock.fixed( Instant.ofEpochMilli(1000), ZoneId.systemDefault())); // Then: @@ -523,7 +523,7 @@ public void shouldNotFailIfFileDoesntFitFormat() throws Exception { assertThat(new File(migrationsDir + "/foo.sql").createNewFile(), is(true)); // When: - final int result = command.command(config, cfg -> ksqlClient, migrationsDir, Clock.fixed( + final int result = command.command(config, (cfg, headers) -> ksqlClient, migrationsDir, Clock.fixed( Instant.ofEpochMilli(1000), ZoneId.systemDefault())); // Then: @@ -544,7 +544,7 @@ public void shouldFailIfMetadataNotInitialized() throws Exception { .thenThrow(new ExecutionException("Source not found", new RuntimeException())); // When: - final int result = command.command(config, cfg -> ksqlClient, migrationsDir, Clock.fixed( + final int result = command.command(config, (cfg, headers) -> ksqlClient, migrationsDir, Clock.fixed( Instant.ofEpochMilli(1000), ZoneId.systemDefault())); // Then: @@ -561,7 +561,7 @@ public void shouldThrowErrorOnParsingFailure() throws Exception { when(versionQueryResult.get()).thenReturn(ImmutableList.of()); // When: - final int result = command.command(config, cfg -> ksqlClient, migrationsDir, Clock.fixed( + final int result = command.command(config, (cfg, headers) -> ksqlClient, migrationsDir, Clock.fixed( Instant.ofEpochMilli(1000), ZoneId.systemDefault())); // Then: @@ -582,7 +582,7 @@ public void shouldApplyInsertStatement() throws Exception { givenAppliedMigration(1, NAME, MigrationState.MIGRATED); // When: - final int result = command.command(config, cfg -> ksqlClient, migrationsDir, Clock.fixed( + final int result = command.command(config, (cfg, headers) -> ksqlClient, migrationsDir, Clock.fixed( Instant.ofEpochMilli(1000), ZoneId.systemDefault())); // Then: @@ -608,7 +608,7 @@ public void shouldApplyCreateConnectorStatement() throws Exception { givenAppliedMigration(1, NAME, MigrationState.MIGRATED); // When: - final int result = command.command(config, cfg -> ksqlClient, migrationsDir, Clock.fixed( + final int result = command.command(config, (cfg, headers) -> ksqlClient, migrationsDir, Clock.fixed( Instant.ofEpochMilli(1000), ZoneId.systemDefault())); // Then: @@ -630,7 +630,7 @@ public void shouldApplyDropConnectorStatement() throws Exception { givenAppliedMigration(1, NAME, MigrationState.MIGRATED); // When: - final int result = command.command(config, cfg -> ksqlClient, migrationsDir, Clock.fixed( + final int result = command.command(config, (cfg, headers) -> ksqlClient, migrationsDir, Clock.fixed( Instant.ofEpochMilli(1000), ZoneId.systemDefault())); // Then: @@ -651,7 +651,7 @@ public void shouldNotApplyOlderVersion() throws Exception { givenCurrentMigrationVersion("1"); // When: - final int result = command.command(config, cfg -> ksqlClient, migrationsDir, Clock.fixed( + final int result = command.command(config, (cfg, headers) -> ksqlClient, migrationsDir, Clock.fixed( Instant.ofEpochMilli(1000), ZoneId.systemDefault())); // Then: diff --git a/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/util/MigrationsUtilTest.java b/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/util/MigrationsUtilTest.java index 3af660e7ea6e..099935b849f7 100644 --- a/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/util/MigrationsUtilTest.java +++ b/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/util/MigrationsUtilTest.java @@ -19,7 +19,10 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; +import com.google.common.collect.ImmutableMap; import io.confluent.ksql.api.client.ClientOptions; +import java.util.Collections; +import java.util.Map; import org.junit.Test; public class MigrationsUtilTest { @@ -32,7 +35,7 @@ public void shouldCreateNonTlsClientOptions() { // Given: final ClientOptions clientOptions = createClientOptions(NON_TLS_URL, "user", "pass", null, "", null, - null, "", "foo", false, true); + null, "", "foo", false, true, null); // Then: assertThat(clientOptions.isUseTls(), is(false)); @@ -46,14 +49,16 @@ public void shouldCreateNonTlsClientOptions() { assertThat(clientOptions.getKeyAlias(), is("")); assertThat(clientOptions.isUseAlpn(), is(false)); assertThat(clientOptions.isVerifyHost(), is(true)); + assertThat(clientOptions.getRequestHeaders(), is(Collections.emptyMap())); } @Test public void shouldCreateTlsClientOptions() { // Given: + final Map requestHeaders = ImmutableMap.of("h1", "v1", "h2", "v2"); final ClientOptions clientOptions = createClientOptions(TLS_URL, "user", "pass", "abc", null, null, - null, null, null, true, true); + null, null, null, true, true, requestHeaders); // Then: assertThat(clientOptions.isUseTls(), is(true)); @@ -67,5 +72,6 @@ public void shouldCreateTlsClientOptions() { assertThat(clientOptions.getKeyAlias(), is("")); assertThat(clientOptions.isUseAlpn(), is(true)); assertThat(clientOptions.isVerifyHost(), is(true)); + assertThat(clientOptions.getRequestHeaders(), is(requestHeaders)); } }