Skip to content

Commit

Permalink
feat: allow users to specify custom migrations dir location (#8844)
Browse files Browse the repository at this point in the history
  • Loading branch information
vcrfxia authored Mar 5, 2022
1 parent 2aad454 commit cbe447e
Show file tree
Hide file tree
Showing 11 changed files with 170 additions and 19 deletions.
19 changes: 18 additions & 1 deletion docs/reference/migrations-tool-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,4 +109,21 @@ The username that will be used to connect to the ksqlDB server. A username and

The password that will be used to connect to the ksqlDB server. A
[username](#ksqlauthbasicusername) and password will be passed as part of
HTTP basic authentication.
HTTP basic authentication.

Migrations Directory Configs
----------------------------

### `ksql.migrations.dir.override`

An optional config that allows you to specify the path to the directory
containing migrations files to be applied. This config is not needed if you
set up your migrations project using the `ksql-migrations new-project` command.

If no override is provided, the migrations directory is inferred relative
to the migrations configuration file passed when using the `ksql-migrations` tool.
Specifically, the migrations directory is inferred as a directory with name
`migrations` contained in the same directory as the migrations configuration file.
This is the default file structure created by the `ksql-migrations new-project` command.

This configuration is available starting with ksqlDB 0.26.0.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ public final class MigrationConfig extends AbstractConfig {
public static final String KSQL_MIGRATIONS_TOPIC_REPLICAS = "ksql.migrations.topic.replicas";
public static final int KSQL_MIGRATIONS_TOPIC_REPLICAS_DEFAULT = 1;

public static final String KSQL_MIGRATIONS_DIR_OVERRIDE = "ksql.migrations.dir.override";

public static final MigrationConfig DEFAULT_CONFIG =
new MigrationConfig(Collections.emptyMap(), "ksql-service-id");

Expand Down Expand Up @@ -167,6 +169,18 @@ private MigrationConfig(final Map<String, String> configs, final String id) {
Importance.MEDIUM,
"The number of replicas for the migration stream topic. It defaults to "
+ KSQL_MIGRATIONS_TOPIC_REPLICAS_DEFAULT
).define(
KSQL_MIGRATIONS_DIR_OVERRIDE,
Type.STRING,
"",
Importance.MEDIUM,
"An optional config that allows users to specify the path to the directory "
+ "containing migrations files to be applied. If empty, the migrations directory "
+ "will be inferred as relative to the migrations configuration file "
+ "passed when using the ksql-migrations tool. Specifically, the migrations "
+ "directory will be inferred as a directory with name 'migrations' contained in "
+ "the same directory as the migrations configuration file. This is the "
+ "default file structure created by the 'ksql-migrations new-project' command."
), configs, false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import static io.confluent.ksql.tools.migrations.util.CommandParser.preserveCase;
import static io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil.getAllMigrations;
import static io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil.getMigrationForVersion;
import static io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil.getMigrationsDirFromConfigFile;
import static io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil.getMigrationsDir;

import com.github.rvesse.airline.annotations.Command;
import com.github.rvesse.airline.annotations.Option;
Expand Down Expand Up @@ -156,7 +156,7 @@ protected int command() {
return command(
config,
MigrationsUtil::getKsqlClient,
getMigrationsDirFromConfigFile(getConfigFile()),
getMigrationsDir(getConfigFile(), config),
Clock.systemDefaultZone()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import static io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil.getAllVersions;
import static io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil.getFilePrefixForVersion;
import static io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil.getMigrationForVersion;
import static io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil.getMigrationsDirFromConfigFile;
import static io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil.getMigrationsDir;

import com.github.rvesse.airline.annotations.Arguments;
import com.github.rvesse.airline.annotations.Command;
Expand All @@ -28,8 +28,10 @@
import com.github.rvesse.airline.annotations.restrictions.Required;
import com.github.rvesse.airline.annotations.restrictions.ranges.IntegerRange;
import com.google.common.annotations.VisibleForTesting;
import io.confluent.ksql.tools.migrations.MigrationConfig;
import io.confluent.ksql.tools.migrations.MigrationException;
import io.confluent.ksql.tools.migrations.util.MigrationFile;
import io.confluent.ksql.util.KsqlException;
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
Expand Down Expand Up @@ -77,7 +79,15 @@ protected int command() {
return 1;
}

return command(getMigrationsDirFromConfigFile(getConfigFile()));
final MigrationConfig config;
try {
config = MigrationConfig.load(getConfigFile());
} catch (KsqlException | MigrationException e) {
LOGGER.error(e.getMessage());
return 1;
}

return command(getMigrationsDir(getConfigFile(), config));
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package io.confluent.ksql.tools.migrations.commands;

import static io.confluent.ksql.tools.migrations.util.MetadataUtil.getOptionalInfoForVersions;
import static io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil.getMigrationsDirFromConfigFile;
import static io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil.getMigrationsDir;

import com.github.rvesse.airline.annotations.Command;
import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -63,7 +63,7 @@ protected int command() {
return command(
config,
MigrationsUtil::getKsqlClient,
getMigrationsDirFromConfigFile(getConfigFile())
getMigrationsDir(getConfigFile(), config)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ private boolean tryCreatePropertiesFile(final String path, final String ksqlServ
MigrationConfig.KSQL_BASIC_AUTH_PASSWORD
);

private static final List<String> MIGRATIONS_STRUCTURE_CONFIGS = ImmutableList.of(
MigrationConfig.KSQL_MIGRATIONS_DIR_OVERRIDE
);

private static String createInitialConfig(final String ksqlServerUrl) {
final StringBuilder builder = new StringBuilder();

Expand All @@ -175,6 +179,7 @@ private static String createInitialConfig(final String ksqlServerUrl) {
appendConfigs(builder, "Migrations metadata configs", METADATA_CONFIGS);
appendConfigs(builder, "TLS configs", TLS_CONFIGS);
appendConfigs(builder, "ksqlDB server authentication configs", SERVER_AUTH_CONFIGS);
appendConfigs(builder, "Migrations directory configs", MIGRATIONS_STRUCTURE_CONFIGS);

return builder.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import static io.confluent.ksql.tools.migrations.util.MetadataUtil.validateVersionIsMigrated;
import static io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil.computeHashForFile;
import static io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil.getMigrationForVersion;
import static io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil.getMigrationsDirFromConfigFile;
import static io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil.getMigrationsDir;

import com.github.rvesse.airline.annotations.Command;
import com.github.rvesse.airline.annotations.help.Discussion;
Expand Down Expand Up @@ -71,7 +71,7 @@ protected int command() {
return command(
config,
MigrationsUtil::getKsqlClient,
getMigrationsDirFromConfigFile(getConfigFile())
getMigrationsDir(getConfigFile(), config)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

package io.confluent.ksql.tools.migrations.util;

import com.google.common.annotations.VisibleForTesting;
import io.confluent.ksql.tools.migrations.MigrationConfig;
import io.confluent.ksql.tools.migrations.MigrationException;
import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -46,6 +48,32 @@ public final class MigrationsDirectoryUtil {
private MigrationsDirectoryUtil() {
}

/**
* Returns the migrations directory path based on the provided config and
* config file path. This method does not perform validation on the provided
* directory location; that responsibility is left to downstream methods.
*/
public static String getMigrationsDir(
final String configFilePath,
final MigrationConfig config
) {
final String migrationsDir = config.getString(MigrationConfig.KSQL_MIGRATIONS_DIR_OVERRIDE);
if (migrationsDir != null && !migrationsDir.isEmpty()) {
return migrationsDir;
} else {
return getMigrationsDirFromConfigFile(configFilePath);
}
}

/**
* Returns the migrations directory path assuming the default file structure
* (as created by the 'ksql-migrations new-project' command). This method should
* NOT be called directly outside of testing; prefer
* {@link MigrationsDirectoryUtil#getMigrationsDir(String, MigrationConfig)}
* instead as users can override the default location returned by this method
* in their migrations config if they wish.
*/
@VisibleForTesting
public static String getMigrationsDirFromConfigFile(final String configFilePath) {
final Path parentDir = Paths.get(configFilePath).getParent();
if (parentDir == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import static org.mockito.Mockito.verify;

import com.github.rvesse.airline.Cli;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.confluent.common.utils.IntegrationTest;
import io.confluent.ksql.integration.IntegrationTestHarness;
Expand Down Expand Up @@ -62,6 +63,7 @@
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand All @@ -88,16 +90,19 @@
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.RuleChain;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;

@RunWith(MockitoJUnitRunner.class)
@RunWith(Parameterized.class)
@Category({IntegrationTest.class})
public class MigrationsTest {

Expand Down Expand Up @@ -126,24 +131,41 @@ public class MigrationsTest {
.around(TEST_HARNESS)
.around(REST_APP);

@Rule
public final MockitoRule mockitoRule = MockitoJUnit.rule();

@Parameterized.Parameters(name = "{1}")
public static Collection<Object[]> migrationsDirOverrides() {
return ImmutableList.of(new Object[]{false, "no dir override"}, new Object[]{true, "with dir override"});
}

private static final Cli<BaseCommand> MIGRATIONS_CLI = new Cli<>(Migrations.class);

private static final String MIGRATIONS_STREAM = "custom_migration_stream_name";
private static final String MIGRATIONS_TABLE = "custom_migration_table_name";

private static String testDir;
private static String configFilePath;

private static ConnectExecutable CONNECT;

@Mock
private AppenderSkeleton logAppender;
@Captor
private ArgumentCaptor<LoggingEvent> logCaptor;

private static String configFilePath;
private final boolean withMigrationsDirOverride;

private static ConnectExecutable CONNECT;
private String migrationsDir;

public MigrationsTest(final boolean withMigrationsDirOverride, final String testName) {
this.withMigrationsDirOverride = withMigrationsDirOverride;
}

@BeforeClass
public static void setUpClass() throws Exception {

final String testDir = Paths.get(TestUtils.tempDirectory().getAbsolutePath(), "migrations_integ_test").toString();
testDir = Paths.get(TestUtils.tempDirectory().getAbsolutePath(), "migrations_integ_test").toString();
createAndVerifyDirectoryStructure(testDir);

configFilePath = Paths.get(testDir, MigrationsDirectoryUtil.MIGRATIONS_CONFIG_FILE).toString();
Expand Down Expand Up @@ -199,18 +221,35 @@ public static void setUpClass() throws Exception {
@AfterClass
public static void classTearDown() {
CONNECT.shutdown();
REST_APP.getPersistentQueries().forEach(str -> makeKsqlRequest("TERMINATE " + str + ";"));
REST_APP.closePersistentQueries();
}

@Before
public void setUp() {
public void setUp() throws Exception {
if (withMigrationsDirOverride) {
migrationsDir = Paths.get(testDir, "my/custom/migrations_dir").toString();
assertThat(new File(migrationsDir).mkdirs(), is(true));

writeAdditionalConfigs(configFilePath, ImmutableMap.of(
MigrationConfig.KSQL_MIGRATIONS_DIR_OVERRIDE,
migrationsDir
));
} else {
migrationsDir = MigrationsDirectoryUtil.getMigrationsDirFromConfigFile(configFilePath);
}

initializeAndVerifyMetadataStreamAndTable(configFilePath);
waitForMetadataTableReady();
}

@After
public void tearDown() {
cleanAndVerify(configFilePath);

// reset ksql server state in preparation for next run
makeKsqlRequest("DROP CONNECTOR C;");
REST_APP.closePersistentQueries();
REST_APP.dropSourcesExcept();
}

@Test
Expand All @@ -225,6 +264,7 @@ private void shouldApplyMigrations() throws Exception {
1,
"foo FOO fO0",
configFilePath,
migrationsDir,
"CREATE STREAM ${streamName} (A STRING) WITH (KAFKA_TOPIC='FOO', PARTITIONS=1, VALUE_FORMAT='JSON');\n" +
"-- let's create some connectors!!!\n" +
"CREATE SOURCE CONNECTOR C WITH ('connector.class'='org.apache.kafka.connect.tools.MockSourceConnector');\n" +
Expand All @@ -238,6 +278,7 @@ private void shouldApplyMigrations() throws Exception {
2,
"bar_bar_BAR",
configFilePath,
migrationsDir,
"CREATE OR REPLACE STREAM ${streamName} (A STRING, B INT) WITH (KAFKA_TOPIC='FOO', PARTITIONS=1, VALUE_FORMAT='JSON');"
+ "ALTER STREAM ${streamName} ADD COLUMN C BIGINT;" +
"/* add some '''data''' to FOO */" +
Expand Down Expand Up @@ -405,7 +446,7 @@ private static void createAndVerifyDirectoryStructure(final String testDir) thro

// verify config file contents
final List<String> lines = Files.readAllLines(configFile.toPath());
assertThat(lines, hasSize(22));
assertThat(lines, hasSize(25));
assertThat(lines.get(0), is(MigrationConfig.KSQL_SERVER_URL + "=" + REST_APP.getHttpsListener().toString()));
}

Expand Down Expand Up @@ -609,6 +650,7 @@ private static void createMigrationFile(
final int version,
final String name,
final String configFilePath,
final String migrationsDir,
final String content
) throws IOException {
// use `create` to create empty file
Expand All @@ -617,7 +659,7 @@ private static void createMigrationFile(

// validate file created
final File filePath = new File(Paths.get(
MigrationsDirectoryUtil.getMigrationsDirFromConfigFile(configFilePath),
migrationsDir,
String.format("/V00000%d__%s.sql", version, name.replace(' ', '_'))
).toString());
assertThat(filePath.exists(), is(true));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,10 @@ public class NewMigrationCommandTest {
"\n" +
"# ksqlDB server authentication configs:\n" +
"# ksql.auth.basic.username=\n" +
"# ksql.auth.basic.password=\n";
"# ksql.auth.basic.password=\n" +
"\n" +
"# Migrations directory configs:\n" +
"# ksql.migrations.dir.override=\n";

@Rule
public TemporaryFolder folder = KsqlTestFolder.temporaryFolder();
Expand Down
Loading

0 comments on commit cbe447e

Please sign in to comment.