Skip to content

Commit

Permalink
feat(migrations): add ability to apply specific migration version (#7137
Browse files Browse the repository at this point in the history
)
  • Loading branch information
vcrfxia authored Mar 3, 2021
1 parent 679ce9a commit 608cb5e
Show file tree
Hide file tree
Showing 7 changed files with 167 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@
package io.confluent.ksql.tools.migrations.commands;

import static io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil.getAllMigrations;
import static io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil.getMigrationForVersion;
import static io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil.getMigrationsDirFromConfigFile;

import com.github.rvesse.airline.annotations.Command;
import com.github.rvesse.airline.annotations.Option;
import com.github.rvesse.airline.annotations.restrictions.MutuallyExclusiveWith;
import com.github.rvesse.airline.annotations.restrictions.RequireOnlyOne;
import com.google.common.annotations.VisibleForTesting;
import io.confluent.ksql.api.client.Client;
import io.confluent.ksql.tools.migrations.Migration;
Expand All @@ -34,7 +35,9 @@
import io.confluent.ksql.util.RetryUtil;
import java.time.Clock;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand All @@ -56,24 +59,33 @@ public class ApplyMigrationCommand extends BaseCommand {
name = {"-a", "--all"},
description = "run all available migrations"
)
@MutuallyExclusiveWith(tag = "target")
@RequireOnlyOne(tag = "target")
private boolean all;

@Option(
title = "next",
name = {"-n", "--next"},
description = "migrate the next available version"
)
@MutuallyExclusiveWith(tag = "target")
@RequireOnlyOne(tag = "target")
private boolean next;

@Option(
title = "version",
title = "untilVersion",
name = {"-u", "--until"},
arity = 1,
description = "migrate until the specified version"
)
@MutuallyExclusiveWith(tag = "target")
@RequireOnlyOne(tag = "target")
private int untilVersion;

@Option(
title = "untilVersion",
name = {"-v", "--version"},
arity = 1,
description = "apply the migration with the specified version"
)
@RequireOnlyOne(tag = "target")
private int version;

@Override
Expand Down Expand Up @@ -105,8 +117,12 @@ int command(
final String migrationsDir,
final Clock clock
) {
if (untilVersion < 0) {
LOGGER.error("'until' migration version must be positive. Got: {}", untilVersion);
return 1;
}
if (version < 0) {
LOGGER.error("Optional migration version must be positive. Got: {}", version);
LOGGER.error("migration version to apply must be positive. Got: {}", version);
return 1;
}

Expand All @@ -125,7 +141,7 @@ int command(

boolean success;
try {
success = ValidateMigrationsCommand.validate(config, migrationsDir, ksqlClient)
success = validateCurrentState(config, ksqlClient, migrationsDir)
&& apply(config, ksqlClient, migrationsDir, clock);
} catch (MigrationException e) {
LOGGER.error(e.getMessage());
Expand All @@ -151,15 +167,7 @@ private boolean apply(
LOGGER.info("Loading migration files");
final List<Migration> migrations;
try {
migrations = getAllMigrations(migrationsDir).stream()
.filter(migration -> {
if (version > 0) {
return migration.getVersion() <= version && migration.getVersion() >= minimumVersion;
} else {
return migration.getVersion() >= minimumVersion;
}
})
.collect(Collectors.toList());
migrations = loadMigrationsToApply(migrationsDir, minimumVersion);
} catch (MigrationException e) {
LOGGER.error(e.getMessage());
return false;
Expand All @@ -168,7 +176,7 @@ private boolean apply(
if (migrations.size() == 0) {
LOGGER.info("No eligible migrations found.");
} else {
LOGGER.info(migrations.size() + " migration files loaded.");
LOGGER.info(migrations.size() + " migration file(s) loaded.");
}

for (Migration migration : migrations) {
Expand All @@ -177,22 +185,60 @@ private boolean apply(
}
previous = Integer.toString(migration.getVersion());
}

return true;
}

private List<Migration> loadMigrationsToApply(
final String migrationsDir,
final int minimumVersion
) {
if (version > 0) {
final Optional<Migration> migration =
getMigrationForVersion(String.valueOf(version), migrationsDir);
if (!migration.isPresent()) {
throw new MigrationException("No migration file with version " + version + " exists.");
}
return Collections.singletonList(migration.get());
}

final List<Migration> migrations = getAllMigrations(migrationsDir).stream()
.filter(migration -> {
if (migration.getVersion() < minimumVersion) {
return false;
}
if (untilVersion > 0) {
return migration.getVersion() <= untilVersion;
} else {
return true;
}
})
.collect(Collectors.toList());

if (next) {
if (migrations.size() == 0) {
throw new MigrationException("No eligible migrations found.");
}
return Collections.singletonList(migrations.get(0));
}

return migrations;
}

private boolean applyMigration(
final MigrationConfig config,
final Client ksqlClient,
final Migration migration,
final Clock clock,
final String previous
) {
LOGGER.info("Applying " + migration.getName() + " version " + migration.getVersion());
LOGGER.info("Applying migration version {}: {}", migration.getVersion(), migration.getName());
final String migrationFileContent =
MigrationsDirectoryUtil.getFileContentsForName(migration.getFilepath());
LOGGER.info(migrationFileContent);
LOGGER.info("Migration file contents:\n{}", migrationFileContent);

if (dryRun) {
LOGGER.info("Dry run complete. No migrations were actually applied.");
return true;
}

Expand Down Expand Up @@ -307,4 +353,13 @@ private boolean updateState(
protected Logger getLogger() {
return LOGGER;
}

private static boolean validateCurrentState(
final MigrationConfig config,
final Client ksqlClient,
final String migrationsDir
) {
LOGGER.info("Validating current migration state before applying new migrations");
return ValidateMigrationsCommand.validate(config, migrationsDir, ksqlClient);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
package io.confluent.ksql.tools.migrations.commands;

import static io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil.getAllVersions;
import static io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil.getFilePathForVersion;
import static io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil.getFilePrefixForVersion;
import static io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil.getMigrationForVersion;
import static io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil.getMigrationsDirFromConfigFile;

import com.github.rvesse.airline.annotations.Arguments;
Expand All @@ -26,6 +26,7 @@
import com.github.rvesse.airline.annotations.help.Examples;
import com.github.rvesse.airline.annotations.restrictions.Required;
import com.google.common.annotations.VisibleForTesting;
import io.confluent.ksql.tools.migrations.Migration;
import io.confluent.ksql.tools.migrations.MigrationException;
import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -103,17 +104,17 @@ private boolean validateVersionDoesNotAlreadyExist(final String migrationsDir) {
return true;
}

final Optional<String> existingFile;
final Optional<Migration> existingMigration;
try {
existingFile = getFilePathForVersion(String.valueOf(version), migrationsDir);
existingMigration = getMigrationForVersion(String.valueOf(version), migrationsDir);
} catch (MigrationException e) {
LOGGER.error(e.getMessage());
return false;
}

if (existingFile.isPresent()) {
if (existingMigration.isPresent()) {
LOGGER.error("Found existing migrations file for version {}: {}",
version, existingFile.get());
version, existingMigration.get().getFilepath());
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import static io.confluent.ksql.tools.migrations.util.MetadataUtil.getLatestMigratedVersion;
import static io.confluent.ksql.tools.migrations.util.MetadataUtil.validateVersionIsMigrated;
import static io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil.computeHashForFile;
import static io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil.getFilePathForVersion;
import static io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil.getMigrationForVersion;
import static io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil.getMigrationsDirFromConfigFile;

import com.github.rvesse.airline.annotations.Command;
Expand Down Expand Up @@ -134,7 +134,7 @@ static boolean validate(

final String filename;
try {
filename = getFilePathForVersion(version, migrationsDir).get();
filename = getMigrationForVersion(version, migrationsDir).get().getFilepath();
} catch (MigrationException | NoSuchElementException e) {
LOGGER.error("No migrations file found for version with status {}. Version: {}",
MigrationState.MIGRATED, version);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.nio.file.Paths;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -61,7 +60,7 @@ public static String getFilePrefixForVersion(final String version) {
return "V" + StringUtils.leftPad(version, 6, "0");
}

public static Optional<String> getFilePathForVersion(
public static Optional<Migration> getMigrationForVersion(
final String version,
final String migrationsDir
) {
Expand All @@ -77,16 +76,16 @@ public static Optional<String> getFilePathForVersion(
throw new MigrationException("Failed to retrieve files from " + migrationsDir);
}

final List<String> matches = Arrays.stream(names)
final List<Migration> matches = Arrays.stream(names)
.filter(name -> name.startsWith(prefix))
.map(name -> getMigrationFromFilename(migrationsDir, name))
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toList());
if (matches.size() == 1) {
return Optional.of(Paths.get(migrationsDir, matches.get(0)).toString());
} else if (matches.size() == 0) {
return Optional.empty();
} else {
throw new MigrationException("Found multiple migration files for version " + version);
}
// throw on multiple matches
validateMigrationVersionsUnique(matches);

return matches.size() > 0 ? Optional.of(matches.get(0)) : Optional.empty();
}

public static String getFileContentsForName(final String filename) {
Expand Down Expand Up @@ -141,30 +140,59 @@ public static List<Migration> getAllMigrations(final String migrationsDir) {
.filter(name -> !new File(name).isDirectory())
.collect(Collectors.toList());

final List<Migration> migrations = new ArrayList<>();
for (final String filename : filenames) {
final Matcher matcher = MIGRATION_FILE_MATCHER.matcher(filename);
if (!matcher.find()) {
LOGGER.warn("Skipping file does not match expected migration file pattern "
+ "'V<six digit number>__<name>.sql': {}", filename);
continue;
}
final List<Migration> migrations = filenames.stream()
.map(name -> getMigrationFromFilename(migrationsDir, name))
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toList());

final int version = Integer.parseInt(matcher.group(1));
if (version <= 0) {
throw new MigrationException(
"Migration file versions must be positive. Found: " + filename);
}
validateMigrationVersionsUnique(migrations);

final String description = matcher.group(2).replace('_', ' ');
return migrations;
}

migrations.add(new Migration(
version,
description,
migrationsDir + "/" + filename
));
private static Optional<Migration> getMigrationFromFilename(
final String migrationsDir,
final String filename
) {
final Matcher matcher = MIGRATION_FILE_MATCHER.matcher(filename);
if (!matcher.find()) {
LOGGER.warn("Skipping file does not match expected migration file pattern "
+ "'V<six digit number>__<name>.sql': {}", filename);
return Optional.empty();
}

return migrations;
final int version = Integer.parseInt(matcher.group(1));
if (version <= 0) {
throw new MigrationException(
"Migration file versions must be positive. Found: " + filename);
}

final String description = matcher.group(2).replace('_', ' ');

return Optional.of(new Migration(
version,
description,
Paths.get(migrationsDir, filename).toString()
));
}

private static void validateMigrationVersionsUnique(final List<Migration> migrations) {
if (migrations.size() == 0) {
return;
}

Migration previous = migrations.get(0);
for (int i = 1; i < migrations.size(); i++) {
if (migrations.get(i).getVersion() == previous.getVersion()) {
throw new MigrationException(String.format(
"Found multiple migration files with the same version. Version: %d. Files: '%s', '%s'",
migrations.get(i).getVersion(),
migrations.get(i).getFilepath(),
previous.getFilepath()
));
}
previous = migrations.get(i);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public void testApply() throws IOException {
() -> makeKsqlQuery("SELECT * FROM migration_schema_versions WHERE VERSION_KEY='CURRENT';").size(),
is(1)
);
final int status = MIGRATIONS_CLI.parse("--config-file", configFilePath, "apply").run();
final int status = MIGRATIONS_CLI.parse("--config-file", configFilePath, "apply", "-a").run();
assertThat(status, is(0));

// verify FOO and BAR were registered
Expand Down
Loading

0 comments on commit 608cb5e

Please sign in to comment.