Skip to content

Commit

Permalink
feat(migrations): implement clean command (#7153)
Browse files Browse the repository at this point in the history
  • Loading branch information
vcrfxia authored Mar 4, 2021
1 parent e2c06dc commit 2e546b6
Show file tree
Hide file tree
Showing 7 changed files with 537 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ public final class MigrationConfig extends AbstractConfig {
public static final String KSQL_SERVER_URL = "ksql.server.url";

public static final String KSQL_MIGRATIONS_STREAM_NAME = "ksql.migrations.stream.name";
public static final String KSQL_MIGRATIONS_STREAM_NAME_DEFAULT = "migration_events";
public static final String KSQL_MIGRATIONS_STREAM_NAME_DEFAULT = "MIGRATION_EVENTS";
public static final String KSQL_MIGRATIONS_TABLE_NAME = "ksql.migrations.table.name";
public static final String KSQL_MIGRATIONS_TABLE_NAME_DEFAULT = "migration_schema_versions";
public static final String KSQL_MIGRATIONS_TABLE_NAME_DEFAULT = "MIGRATION_SCHEMA_VERSIONS";
public static final String KSQL_MIGRATIONS_STREAM_TOPIC_NAME =
"ksql.migrations.stream.topic.name";
public static final String KSQL_MIGRATIONS_TABLE_TOPIC_NAME = "ksql.migrations.table.topic.name";
Expand Down Expand Up @@ -73,16 +73,16 @@ private MigrationConfig(final Map<String, String> configs, final String id) {
id + "ksql_" + configs
.getOrDefault(KSQL_MIGRATIONS_STREAM_NAME, KSQL_MIGRATIONS_STREAM_NAME_DEFAULT),
Importance.MEDIUM,
"The name of the migration stream topic. It defaults to " + id + "ksql_" + configs
.getOrDefault(KSQL_MIGRATIONS_STREAM_NAME, KSQL_MIGRATIONS_STREAM_NAME_DEFAULT)
"The name of the migration stream topic. It defaults to "
+ "'<ksql_service_id>ksql_<migrations_stream_name>'"
).define(
KSQL_MIGRATIONS_TABLE_TOPIC_NAME,
Type.STRING,
id + "ksql_" + configs
.getOrDefault(KSQL_MIGRATIONS_TABLE_NAME, KSQL_MIGRATIONS_TABLE_NAME_DEFAULT),
Importance.MEDIUM,
"The name of the migration table topic. It defaults to" + id + "ksql_" + configs
.getOrDefault(KSQL_MIGRATIONS_TABLE_NAME, KSQL_MIGRATIONS_TABLE_NAME_DEFAULT)
"The name of the migration table topic. It defaults to "
+ "'<ksql_service_id>ksql_<migrations_table_name>'"
).define(
KSQL_MIGRATIONS_TOPIC_REPLICAS,
Type.INT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,26 @@
package io.confluent.ksql.tools.migrations.commands;

import com.github.rvesse.airline.annotations.Command;
import com.google.common.annotations.VisibleForTesting;
import io.confluent.ksql.api.client.Client;
import io.confluent.ksql.api.client.QueryInfo;
import io.confluent.ksql.api.client.SourceDescription;
import io.confluent.ksql.api.client.StreamInfo;
import io.confluent.ksql.api.client.TableInfo;
import io.confluent.ksql.tools.migrations.MigrationConfig;
import io.confluent.ksql.tools.migrations.MigrationException;
import io.confluent.ksql.tools.migrations.util.MigrationsUtil;
import io.confluent.ksql.tools.migrations.util.ServerVersionUtil;
import io.confluent.ksql.util.KsqlException;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Command(
name = "new",
name = "clean",
description = "Cleans all resources related to migrations. WARNING: this is not reversible!"
)
public class CleanMigrationsCommand extends BaseCommand {
Expand All @@ -29,12 +44,175 @@ public class CleanMigrationsCommand extends BaseCommand {

@Override
protected int command() {
throw new UnsupportedOperationException();
if (!validateConfigFilePresent()) {
return 1;
}

final MigrationConfig config;
try {
config = MigrationConfig.load(configFile);
} catch (KsqlException | MigrationException e) {
LOGGER.error(e.getMessage());
return 1;
}

return command(config, MigrationsUtil::getKsqlClient);
}

@VisibleForTesting
int command(
final MigrationConfig config,
final Function<MigrationConfig, Client> clientSupplier
) {
final String streamName = config.getString(MigrationConfig.KSQL_MIGRATIONS_STREAM_NAME);
final String tableName = config.getString(MigrationConfig.KSQL_MIGRATIONS_TABLE_NAME);

final Client ksqlClient;
try {
ksqlClient = clientSupplier.apply(config);
} catch (MigrationException e) {
LOGGER.error(e.getMessage());
return 1;
}

if (ServerVersionUtil.serverVersionCompatible(ksqlClient, config)
&& deleteMigrationsTable(ksqlClient, tableName)
&& deleteMigrationsStream(ksqlClient, streamName)) {
LOGGER.info("Migrations metadata cleaned successfully");
ksqlClient.close();
return 0;
} else {
ksqlClient.close();
return 1;
}
}

@Override
protected Logger getLogger() {
return LOGGER;
}

private boolean deleteMigrationsTable(final Client ksqlClient, final String tableName) {
try {
if (!sourceExists(ksqlClient, tableName, true)) {
// nothing to delete
return true;
}

final SourceDescription tableInfo = getSourceInfo(ksqlClient, tableName, true);
terminateQueryForTable(ksqlClient, tableInfo);
dropSource(ksqlClient, tableName, true);

return true;
} catch (MigrationException e) {
LOGGER.error(e.getMessage());
return false;
}
}

private boolean deleteMigrationsStream(final Client ksqlClient, final String streamName) {
try {
if (!sourceExists(ksqlClient, streamName, false)) {
// nothing to delete
return true;
}

dropSource(ksqlClient, streamName, false);
return true;
} catch (MigrationException e) {
LOGGER.error(e.getMessage());
return false;
}
}

private static boolean sourceExists(
final Client ksqlClient,
final String sourceName,
final boolean isTable
) {
try {
if (isTable) {
final List<TableInfo> tables = ksqlClient.listTables().get();
return tables.stream()
.anyMatch(tableInfo -> tableInfo.getName().equalsIgnoreCase(sourceName));
} else {
final List<StreamInfo> streams = ksqlClient.listStreams().get();
return streams.stream()
.anyMatch(streamInfo -> streamInfo.getName().equalsIgnoreCase(sourceName));
}
} catch (InterruptedException | ExecutionException e) {
throw new MigrationException(String.format(
"Failed to check for presence of metadata %s '%s': %s",
isTable ? "table" : "stream",
sourceName,
e.getMessage()));
}
}

/**
* Returns the source description, assuming the source exists.
*/
private static SourceDescription getSourceInfo(
final Client ksqlClient,
final String sourceName,
final boolean isTable
) {
try {
return ksqlClient.describeSource(sourceName).get();
} catch (InterruptedException | ExecutionException e) {
throw new MigrationException(String.format("Failed to describe metadata %s '%s': %s",
isTable ? "table" : "stream",
sourceName,
e.getMessage()));
}
}

private static void terminateQueryForTable(
final Client ksqlClient,
final SourceDescription tableDesc
) {
final List<QueryInfo> queries = tableDesc.writeQueries();
if (queries.size() == 0) {
LOGGER.info("Found 0 queries writing to metadata table");
return;
}

if (queries.size() > 1) {
throw new MigrationException("Found multiple queries writing to metadata table. Query IDs: "
+ queries.stream()
.map(QueryInfo::getId)
.collect(Collectors.joining("', '", "'", "'.")));
}

final String queryId = queries.get(0).getId();
LOGGER.info("Found 1 query writing to metadata table. Query ID: {}", queryId);

LOGGER.info("Terminating query with ID: {}", queryId);
try {
ksqlClient.executeStatement("TERMINATE " + queryId + ";").get();
} catch (InterruptedException | ExecutionException e) {
throw new MigrationException(String.format(
"Failed to terminate query populating metadata table. Query ID: %s. Error: %s",
queryId, e.getMessage()));
}
}

private static void dropSource(
final Client ksqlClient,
final String sourceName,
final boolean isTable
) {
final String sourceType = isTable ? "table" : "stream";
LOGGER.info("Dropping migrations metadata {}: {}", sourceType, sourceName);
try {
final String sql = String.format("DROP %s %s DELETE TOPIC;",
sourceType.toUpperCase(), sourceName);
ksqlClient.executeStatement(sql).get();
} catch (InterruptedException | ExecutionException e) {
throw new MigrationException(String.format("Failed to drop metadata %s '%s': %s",
sourceType,
sourceName,
e.getMessage()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import com.github.rvesse.airline.annotations.Command;
import com.google.common.annotations.VisibleForTesting;
import io.confluent.ksql.api.client.Client;
import io.confluent.ksql.api.client.ServerInfo;
import io.confluent.ksql.tools.migrations.MigrationConfig;
import io.confluent.ksql.tools.migrations.MigrationException;
import io.confluent.ksql.tools.migrations.util.MigrationsUtil;
Expand Down Expand Up @@ -58,8 +57,12 @@ private String createEventStream(final String name, final String topic, final in
+ ");\n";
}

private String createVersionTable(final String name, final String topic) {
return "CREATE TABLE " + name + "\n"
private String createVersionTable(
final String tableName,
final String streamName,
final String topic
) {
return "CREATE TABLE " + tableName + "\n"
+ " WITH (\n"
+ " KAFKA_TOPIC='" + topic + "'\n"
+ " )\n"
Expand All @@ -73,7 +76,7 @@ private String createVersionTable(final String name, final String topic) {
+ " latest_by_offset(completed_on) AS completed_on, \n"
+ " latest_by_offset(previous) AS previous, \n"
+ " latest_by_offset(error_reason) AS error_reason \n"
+ " FROM migration_events \n"
+ " FROM " + streamName + " \n"
+ " GROUP BY version_key;\n";
}

Expand Down Expand Up @@ -108,6 +111,7 @@ int command(
);
final String versionTableCommand = createVersionTable(
tableName,
streamName,
config.getString(MigrationConfig.KSQL_MIGRATIONS_TABLE_TOPIC_NAME)
);

Expand All @@ -119,10 +123,10 @@ int command(
return 1;
}

if (serverVersionCompatible(ksqlClient, config)
if (ServerVersionUtil.serverVersionCompatible(ksqlClient, config)
&& tryCreate(ksqlClient, eventStreamCommand, streamName, true)
&& tryCreate(ksqlClient, versionTableCommand, tableName, false)) {
LOGGER.info("Schema metadata initialized successfully");
LOGGER.info("Migrations metadata initialized successfully");
ksqlClient.close();
} else {
ksqlClient.close();
Expand All @@ -132,28 +136,6 @@ && tryCreate(ksqlClient, versionTableCommand, tableName, false)) {
return 0;
}

private static boolean serverVersionCompatible(
final Client ksqlClient,
final MigrationConfig config
) {
final String ksqlServerUrl = config.getString(MigrationConfig.KSQL_SERVER_URL);
final ServerInfo serverInfo;
try {
serverInfo = ServerVersionUtil.getServerInfo(ksqlClient, ksqlServerUrl);
} catch (MigrationException e) {
LOGGER.error("Failed to get server info to verify version compatibility: {}", e.getMessage());
return false;
}

final String serverVersion = serverInfo.getServerVersion();
try {
return ServerVersionUtil.isSupportedVersion(serverVersion);
} catch (MigrationException e) {
LOGGER.warn(e.getMessage() + ". Proceeding anyway.");
return true;
}
}

private static boolean tryCreate(
final Client client,
final String command,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.confluent.ksql.api.client.Client;
import io.confluent.ksql.api.client.ServerInfo;
import io.confluent.ksql.tools.migrations.MigrationConfig;
import io.confluent.ksql.tools.migrations.MigrationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -84,6 +85,28 @@ public static boolean versionSupportsMultiKeyPullQuery(final String ksqlServerVe
return version.isAtLeast(6, 1, 0, 14);
}

public static boolean serverVersionCompatible(
final Client ksqlClient,
final MigrationConfig config
) {
final String ksqlServerUrl = config.getString(MigrationConfig.KSQL_SERVER_URL);
final ServerInfo serverInfo;
try {
serverInfo = getServerInfo(ksqlClient, ksqlServerUrl);
} catch (MigrationException e) {
LOGGER.error("Failed to get server info to verify version compatibility: {}", e.getMessage());
return false;
}

final String serverVersion = serverInfo.getServerVersion();
try {
return isSupportedVersion(serverVersion);
} catch (MigrationException e) {
LOGGER.warn(e.getMessage() + ". Proceeding anyway.");
return true;
}
}

private static class KsqlServerVersion {

private static final Pattern VERSION_PATTERN = Pattern.compile("v?([0-9]+)\\.([0-9]+)\\..*");
Expand Down
Loading

0 comments on commit 2e546b6

Please sign in to comment.