Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(migrations): add ability to apply specific migration version #7137

Merged
merged 5 commits into from
Mar 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@
package io.confluent.ksql.tools.migrations.commands;

import static io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil.getAllMigrations;
import static io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil.getMigrationForVersion;
import static io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil.getMigrationsDirFromConfigFile;

import com.github.rvesse.airline.annotations.Command;
import com.github.rvesse.airline.annotations.Option;
import com.github.rvesse.airline.annotations.restrictions.MutuallyExclusiveWith;
import com.github.rvesse.airline.annotations.restrictions.RequireOnlyOne;
import com.google.common.annotations.VisibleForTesting;
import io.confluent.ksql.api.client.Client;
import io.confluent.ksql.tools.migrations.Migration;
Expand All @@ -34,7 +35,9 @@
import io.confluent.ksql.util.RetryUtil;
import java.time.Clock;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand All @@ -56,24 +59,33 @@ public class ApplyMigrationCommand extends BaseCommand {
name = {"-a", "--all"},
description = "run all available migrations"
)
@MutuallyExclusiveWith(tag = "target")
@RequireOnlyOne(tag = "target")
private boolean all;

@Option(
title = "next",
name = {"-n", "--next"},
description = "migrate the next available version"
)
@MutuallyExclusiveWith(tag = "target")
@RequireOnlyOne(tag = "target")
private boolean next;

@Option(
title = "version",
title = "untilVersion",
name = {"-u", "--until"},
arity = 1,
description = "migrate until the specified version"
)
@MutuallyExclusiveWith(tag = "target")
@RequireOnlyOne(tag = "target")
private int untilVersion;

@Option(
title = "untilVersion",
name = {"-v", "--version"},
arity = 1,
description = "apply the migration with the specified version"
)
@RequireOnlyOne(tag = "target")
private int version;

@Override
Expand Down Expand Up @@ -105,8 +117,12 @@ int command(
final String migrationsDir,
final Clock clock
) {
if (untilVersion < 0) {
LOGGER.error("'until' migration version must be positive. Got: {}", untilVersion);
return 1;
}
if (version < 0) {
LOGGER.error("Optional migration version must be positive. Got: {}", version);
LOGGER.error("migration version to apply must be positive. Got: {}", version);
return 1;
}

Expand All @@ -125,7 +141,7 @@ int command(

boolean success;
try {
success = ValidateMigrationsCommand.validate(config, migrationsDir, ksqlClient)
success = validateCurrentState(config, ksqlClient, migrationsDir)
&& apply(config, ksqlClient, migrationsDir, clock);
} catch (MigrationException e) {
LOGGER.error(e.getMessage());
Expand All @@ -151,15 +167,7 @@ private boolean apply(
LOGGER.info("Loading migration files");
final List<Migration> migrations;
try {
migrations = getAllMigrations(migrationsDir).stream()
.filter(migration -> {
if (version > 0) {
return migration.getVersion() <= version && migration.getVersion() >= minimumVersion;
} else {
return migration.getVersion() >= minimumVersion;
}
})
.collect(Collectors.toList());
migrations = loadMigrationsToApply(migrationsDir, minimumVersion);
} catch (MigrationException e) {
LOGGER.error(e.getMessage());
return false;
Expand All @@ -168,7 +176,7 @@ private boolean apply(
if (migrations.size() == 0) {
LOGGER.info("No eligible migrations found.");
} else {
LOGGER.info(migrations.size() + " migration files loaded.");
LOGGER.info(migrations.size() + " migration file(s) loaded.");
}

for (Migration migration : migrations) {
Expand All @@ -177,22 +185,60 @@ private boolean apply(
}
previous = Integer.toString(migration.getVersion());
}

return true;
}

private List<Migration> loadMigrationsToApply(
final String migrationsDir,
final int minimumVersion
) {
if (version > 0) {
final Optional<Migration> migration =
getMigrationForVersion(String.valueOf(version), migrationsDir);
if (!migration.isPresent()) {
throw new MigrationException("No migration file with version " + version + " exists.");
}
return Collections.singletonList(migration.get());
}

final List<Migration> migrations = getAllMigrations(migrationsDir).stream()
.filter(migration -> {
if (migration.getVersion() < minimumVersion) {
return false;
}
if (untilVersion > 0) {
return migration.getVersion() <= untilVersion;
} else {
return true;
}
})
.collect(Collectors.toList());

if (next) {
if (migrations.size() == 0) {
throw new MigrationException("No eligible migrations found.");
}
return Collections.singletonList(migrations.get(0));
}

return migrations;
}

private boolean applyMigration(
final MigrationConfig config,
final Client ksqlClient,
final Migration migration,
final Clock clock,
final String previous
) {
LOGGER.info("Applying " + migration.getName() + " version " + migration.getVersion());
LOGGER.info("Applying migration version {}: {}", migration.getVersion(), migration.getName());
final String migrationFileContent =
MigrationsDirectoryUtil.getFileContentsForName(migration.getFilepath());
LOGGER.info(migrationFileContent);
LOGGER.info("Migration file contents:\n{}", migrationFileContent);

if (dryRun) {
LOGGER.info("Dry run complete. No migrations were actually applied.");
return true;
}

Expand Down Expand Up @@ -307,4 +353,13 @@ private boolean updateState(
protected Logger getLogger() {
return LOGGER;
}

private static boolean validateCurrentState(
final MigrationConfig config,
final Client ksqlClient,
final String migrationsDir
) {
LOGGER.info("Validating current migration state before applying new migrations");
return ValidateMigrationsCommand.validate(config, migrationsDir, ksqlClient);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
package io.confluent.ksql.tools.migrations.commands;

import static io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil.getAllVersions;
import static io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil.getFilePathForVersion;
import static io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil.getFilePrefixForVersion;
import static io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil.getMigrationForVersion;
import static io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil.getMigrationsDirFromConfigFile;

import com.github.rvesse.airline.annotations.Arguments;
Expand All @@ -26,6 +26,7 @@
import com.github.rvesse.airline.annotations.help.Examples;
import com.github.rvesse.airline.annotations.restrictions.Required;
import com.google.common.annotations.VisibleForTesting;
import io.confluent.ksql.tools.migrations.Migration;
import io.confluent.ksql.tools.migrations.MigrationException;
import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -103,17 +104,17 @@ private boolean validateVersionDoesNotAlreadyExist(final String migrationsDir) {
return true;
}

final Optional<String> existingFile;
final Optional<Migration> existingMigration;
try {
existingFile = getFilePathForVersion(String.valueOf(version), migrationsDir);
existingMigration = getMigrationForVersion(String.valueOf(version), migrationsDir);
} catch (MigrationException e) {
LOGGER.error(e.getMessage());
return false;
}

if (existingFile.isPresent()) {
if (existingMigration.isPresent()) {
LOGGER.error("Found existing migrations file for version {}: {}",
version, existingFile.get());
version, existingMigration.get().getFilepath());
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import static io.confluent.ksql.tools.migrations.util.MetadataUtil.getLatestMigratedVersion;
import static io.confluent.ksql.tools.migrations.util.MetadataUtil.validateVersionIsMigrated;
import static io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil.computeHashForFile;
import static io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil.getFilePathForVersion;
import static io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil.getMigrationForVersion;
import static io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil.getMigrationsDirFromConfigFile;

import com.github.rvesse.airline.annotations.Command;
Expand Down Expand Up @@ -134,7 +134,7 @@ static boolean validate(

final String filename;
try {
filename = getFilePathForVersion(version, migrationsDir).get();
filename = getMigrationForVersion(version, migrationsDir).get().getFilepath();
} catch (MigrationException | NoSuchElementException e) {
LOGGER.error("No migrations file found for version with status {}. Version: {}",
MigrationState.MIGRATED, version);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.nio.file.Paths;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -61,7 +60,7 @@ public static String getFilePrefixForVersion(final String version) {
return "V" + StringUtils.leftPad(version, 6, "0");
}

public static Optional<String> getFilePathForVersion(
public static Optional<Migration> getMigrationForVersion(
final String version,
final String migrationsDir
) {
Expand All @@ -77,16 +76,16 @@ public static Optional<String> getFilePathForVersion(
throw new MigrationException("Failed to retrieve files from " + migrationsDir);
}

final List<String> matches = Arrays.stream(names)
final List<Migration> matches = Arrays.stream(names)
.filter(name -> name.startsWith(prefix))
.map(name -> getMigrationFromFilename(migrationsDir, name))
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toList());
if (matches.size() == 1) {
return Optional.of(Paths.get(migrationsDir, matches.get(0)).toString());
} else if (matches.size() == 0) {
return Optional.empty();
} else {
throw new MigrationException("Found multiple migration files for version " + version);
}
// throw on multiple matches
validateMigrationVersionsUnique(matches);

return matches.size() > 0 ? Optional.of(matches.get(0)) : Optional.empty();
}

public static String getFileContentsForName(final String filename) {
Expand Down Expand Up @@ -141,30 +140,59 @@ public static List<Migration> getAllMigrations(final String migrationsDir) {
.filter(name -> !new File(name).isDirectory())
.collect(Collectors.toList());

final List<Migration> migrations = new ArrayList<>();
for (final String filename : filenames) {
final Matcher matcher = MIGRATION_FILE_MATCHER.matcher(filename);
if (!matcher.find()) {
LOGGER.warn("Skipping file does not match expected migration file pattern "
+ "'V<six digit number>__<name>.sql': {}", filename);
continue;
}
final List<Migration> migrations = filenames.stream()
.map(name -> getMigrationFromFilename(migrationsDir, name))
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toList());

final int version = Integer.parseInt(matcher.group(1));
if (version <= 0) {
throw new MigrationException(
"Migration file versions must be positive. Found: " + filename);
}
validateMigrationVersionsUnique(migrations);

final String description = matcher.group(2).replace('_', ' ');
return migrations;
}

migrations.add(new Migration(
version,
description,
migrationsDir + "/" + filename
));
private static Optional<Migration> getMigrationFromFilename(
final String migrationsDir,
final String filename
) {
final Matcher matcher = MIGRATION_FILE_MATCHER.matcher(filename);
if (!matcher.find()) {
LOGGER.warn("Skipping file does not match expected migration file pattern "
+ "'V<six digit number>__<name>.sql': {}", filename);
return Optional.empty();
}

return migrations;
final int version = Integer.parseInt(matcher.group(1));
if (version <= 0) {
throw new MigrationException(
"Migration file versions must be positive. Found: " + filename);
}

final String description = matcher.group(2).replace('_', ' ');

return Optional.of(new Migration(
version,
description,
Paths.get(migrationsDir, filename).toString()
));
}

private static void validateMigrationVersionsUnique(final List<Migration> migrations) {
if (migrations.size() == 0) {
return;
}

Migration previous = migrations.get(0);
for (int i = 1; i < migrations.size(); i++) {
if (migrations.get(i).getVersion() == previous.getVersion()) {
throw new MigrationException(String.format(
"Found multiple migration files with the same version. Version: %d. Files: '%s', '%s'",
migrations.get(i).getVersion(),
migrations.get(i).getFilepath(),
previous.getFilepath()
));
}
previous = migrations.get(i);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public void testApply() throws IOException {
() -> makeKsqlQuery("SELECT * FROM migration_schema_versions WHERE VERSION_KEY='CURRENT';").size(),
is(1)
);
final int status = MIGRATIONS_CLI.parse("--config-file", configFilePath, "apply").run();
final int status = MIGRATIONS_CLI.parse("--config-file", configFilePath, "apply", "-a").run();
assertThat(status, is(0));

// verify FOO and BAR were registered
Expand Down
Loading